Hi guys,
while creating the 0.6-incubating release I noticed that often build issues are triggered by changing dependencies. In particular we allow users to set the version of the Hadoop dependency. Right now, we test the following variants: (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) Accidentially, I found out that the recently merged streaming component does not build with hadoop 2.4.0 as a dependency ( https://issues.apache.org/jira/browse/FLINK-1065). I'm suggesting to add the following versions into the pool of Hadoop versions we test against: 1) "hadoop 2.0.0-alpha" 2 "hadoop 2.5.0" 1) is going to be the replacement for the "cdh4" package, and I think we should test versions we are going to ship with releases. ( https://issues.apache.org/jira/browse/FLINK-1068) 2) is the current stable hadoop version. I think we should test against hadoop 2.2.0 and the latest stable hadoop version. Adding these two versions would result in 3x4 = 12 builds per push / pull request, which is a lot given that we can only run 5 tests in parallel. Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and the two new hadoop versions. Opinions? -- Robert |
Thanks for reporting the issue, I'll pick it up soon.
I would not support to have 12 builds, as it would take ages -> +1 for the suggestion on adding just 2 builds with "oraclejdk8" and the two new hadoop versions. On Wed, Aug 27, 2014 at 11:40 AM, Robert Metzger <[hidden email]> wrote: > Hi guys, > > while creating the 0.6-incubating release I noticed that often build issues > are triggered by changing dependencies. > In particular we allow users to set the version of the Hadoop dependency. > > Right now, we test the following variants: > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > > Accidentially, I found out that the recently merged streaming component > does not build with hadoop 2.4.0 as a dependency ( > https://issues.apache.org/jira/browse/FLINK-1065). > > I'm suggesting to add the following versions into the pool of Hadoop > versions we test against: > 1) "hadoop 2.0.0-alpha" > 2 "hadoop 2.5.0" > > 1) is going to be the replacement for the "cdh4" package, and I think we > should test versions we are going to ship with releases. ( > https://issues.apache.org/jira/browse/FLINK-1068) > 2) is the current stable hadoop version. I think we should test against > hadoop 2.2.0 and the latest stable hadoop version. > > Adding these two versions would result in 3x4 = 12 builds per push / pull > request, which is a lot given that we can only run 5 tests in parallel. > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and the > two new hadoop versions. > > Opinions? > > > -- Robert > |
In reply to this post by Robert Metzger
In general, I agree that we should add further variants.
To make sure that I understand you correctly: We keep the current 6 variants (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) and add 2 variants (oraclejdk8) x (hadoop 2.0.0-alpha, hadoop 2.5.0), resulting in 8 instead of the "full" 12? Assuming that one (jdk) x (hadoop) variant takes ~20 mins, we would save 20 mins before we have a confirmation for good builds. Since bad builds will probably fail earlier anyways, I vote to go for the full matrix. In any case we wouldn't loose anything by adding a further JDK to the new variants as there should be not a big time difference between 10 and 8 variants (assuming that the 5 parallel builds take roughly the same amount of time). On Wed, Aug 27, 2014 at 11:40 AM, Robert Metzger <[hidden email]> wrote: > Hi guys, > > while creating the 0.6-incubating release I noticed that often build issues > are triggered by changing dependencies. > In particular we allow users to set the version of the Hadoop dependency. > > Right now, we test the following variants: > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > > Accidentially, I found out that the recently merged streaming component > does not build with hadoop 2.4.0 as a dependency ( > https://issues.apache.org/jira/browse/FLINK-1065). > > I'm suggesting to add the following versions into the pool of Hadoop > versions we test against: > 1) "hadoop 2.0.0-alpha" > 2 "hadoop 2.5.0" > > 1) is going to be the replacement for the "cdh4" package, and I think we > should test versions we are going to ship with releases. ( > https://issues.apache.org/jira/browse/FLINK-1068) > 2) is the current stable hadoop version. I think we should test against > hadoop 2.2.0 and the latest stable hadoop version. > > Adding these two versions would result in 3x4 = 12 builds per push / pull > request, which is a lot given that we can only run 5 tests in parallel. > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and the > two new hadoop versions. > > Opinions? > > > -- Robert > |
In reply to this post by Robert Metzger
I think the most important thing is building at least once against the
4 Hadoop versions, and building at least once against the 3 JDK versions. It's very unlikely that a particular JDK + Hadoop version fails to compile, while the same JDK with another Hadoop, or the same Hadoop with another JDK, does. I think you could get away with 4: 1.2.1 - 6 2.0.0-alpha - 6 2.2.0 - 7 2.5.0 - 8 These at least pairs old JDK with old Hadoop. I am not sure Hadoop < 2.2 even reliably works with Java 7, for example? testing Java 8 + Hadoop 1.2.1 is probably pointless, for example. You can add back a few more pairs here and there if this feels too sparse. On Wed, Aug 27, 2014 at 10:40 AM, Robert Metzger <[hidden email]> wrote: > Hi guys, > > while creating the 0.6-incubating release I noticed that often build issues > are triggered by changing dependencies. > In particular we allow users to set the version of the Hadoop dependency. > > Right now, we test the following variants: > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > > Accidentially, I found out that the recently merged streaming component > does not build with hadoop 2.4.0 as a dependency ( > https://issues.apache.org/jira/browse/FLINK-1065). > > I'm suggesting to add the following versions into the pool of Hadoop > versions we test against: > 1) "hadoop 2.0.0-alpha" > 2 "hadoop 2.5.0" > > 1) is going to be the replacement for the "cdh4" package, and I think we > should test versions we are going to ship with releases. ( > https://issues.apache.org/jira/browse/FLINK-1068) > 2) is the current stable hadoop version. I think we should test against > hadoop 2.2.0 and the latest stable hadoop version. > > Adding these two versions would result in 3x4 = 12 builds per push / pull > request, which is a lot given that we can only run 5 tests in parallel. > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and the > two new hadoop versions. > > Opinions? > > > -- Robert |
Sounds reasonable.
Since Travis runs 5 concurrent builds, we can add one more without adding extra time. I would suggest to add (1.2.1 - Java 7) -> I would suspect that to be still used quite a bit in that combination. Stephan On Wed, Aug 27, 2014 at 12:06 PM, Sean Owen <[hidden email]> wrote: > I think the most important thing is building at least once against the > 4 Hadoop versions, and building at least once against the 3 JDK > versions. It's very unlikely that a particular JDK + Hadoop version > fails to compile, while the same JDK with another Hadoop, or the same > Hadoop with another JDK, does. > > I think you could get away with 4: > > 1.2.1 - 6 > 2.0.0-alpha - 6 > 2.2.0 - 7 > 2.5.0 - 8 > > These at least pairs old JDK with old Hadoop. I am not sure Hadoop < > 2.2 even reliably works with Java 7, for example? testing Java 8 + > Hadoop 1.2.1 is probably pointless, for example. > > You can add back a few more pairs here and there if this feels too sparse. > > > On Wed, Aug 27, 2014 at 10:40 AM, Robert Metzger <[hidden email]> > wrote: > > Hi guys, > > > > while creating the 0.6-incubating release I noticed that often build > issues > > are triggered by changing dependencies. > > In particular we allow users to set the version of the Hadoop dependency. > > > > Right now, we test the following variants: > > > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > > > > Accidentially, I found out that the recently merged streaming component > > does not build with hadoop 2.4.0 as a dependency ( > > https://issues.apache.org/jira/browse/FLINK-1065). > > > > I'm suggesting to add the following versions into the pool of Hadoop > > versions we test against: > > 1) "hadoop 2.0.0-alpha" > > 2 "hadoop 2.5.0" > > > > 1) is going to be the replacement for the "cdh4" package, and I think we > > should test versions we are going to ship with releases. ( > > https://issues.apache.org/jira/browse/FLINK-1068) > > 2) is the current stable hadoop version. I think we should test against > > hadoop 2.2.0 and the latest stable hadoop version. > > > > Adding these two versions would result in 3x4 = 12 builds per push / pull > > request, which is a lot given that we can only run 5 tests in parallel. > > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and the > > two new hadoop versions. > > > > Opinions? > > > > > > -- Robert > |
@Sean: We are able to build Hadoop 1.2.1 with Java 8 because we are always
setting the compilation level to Java 6. I can remember two instances where we found java specific issues: 1) The javadocs of Java 8 are checking the correctness of the HTML codes in the source comments, so our java 8 builds failed. 2) Oracle's Java 6 has a compiler bug that leads to null pointer exceptions during compilations. Both issues were discovered independent of the Hadoop version, so we should be fine. I'm probably going for 5 parallel builds just because it will not add any additional waiting time. Thank you for the feedback. I'll try and see how I can configure travis to reflect these build settings: https://issues.apache.org/jira/browse/FLINK-1072. -- Robert On Wed, Aug 27, 2014 at 4:31 PM, Stephan Ewen <[hidden email]> wrote: > Sounds reasonable. > > Since Travis runs 5 concurrent builds, we can add one more without adding > extra time. > > I would suggest to add (1.2.1 - Java 7) -> I would suspect that to be > still used quite a bit in that combination. > > Stephan > > > > On Wed, Aug 27, 2014 at 12:06 PM, Sean Owen <[hidden email]> wrote: > > > I think the most important thing is building at least once against the > > 4 Hadoop versions, and building at least once against the 3 JDK > > versions. It's very unlikely that a particular JDK + Hadoop version > > fails to compile, while the same JDK with another Hadoop, or the same > > Hadoop with another JDK, does. > > > > I think you could get away with 4: > > > > 1.2.1 - 6 > > 2.0.0-alpha - 6 > > 2.2.0 - 7 > > 2.5.0 - 8 > > > > These at least pairs old JDK with old Hadoop. I am not sure Hadoop < > > 2.2 even reliably works with Java 7, for example? testing Java 8 + > > Hadoop 1.2.1 is probably pointless, for example. > > > > You can add back a few more pairs here and there if this feels too > sparse. > > > > > > On Wed, Aug 27, 2014 at 10:40 AM, Robert Metzger <[hidden email]> > > wrote: > > > Hi guys, > > > > > > while creating the 0.6-incubating release I noticed that often build > > issues > > > are triggered by changing dependencies. > > > In particular we allow users to set the version of the Hadoop > dependency. > > > > > > Right now, we test the following variants: > > > > > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > > > > > > Accidentially, I found out that the recently merged streaming component > > > does not build with hadoop 2.4.0 as a dependency ( > > > https://issues.apache.org/jira/browse/FLINK-1065). > > > > > > I'm suggesting to add the following versions into the pool of Hadoop > > > versions we test against: > > > 1) "hadoop 2.0.0-alpha" > > > 2 "hadoop 2.5.0" > > > > > > 1) is going to be the replacement for the "cdh4" package, and I think > we > > > should test versions we are going to ship with releases. ( > > > https://issues.apache.org/jira/browse/FLINK-1068) > > > 2) is the current stable hadoop version. I think we should test against > > > hadoop 2.2.0 and the latest stable hadoop version. > > > > > > Adding these two versions would result in 3x4 = 12 builds per push / > pull > > > request, which is a lot given that we can only run 5 tests in parallel. > > > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and > the > > > two new hadoop versions. > > > > > > Opinions? > > > > > > > > > -- Robert > > > |
Sure, if you can see a good reason to add 1-2 more, go for it. Yes you
certainly want to cover all of these Hadoop and JDK versions at least once for the reasons you give. PS you won't catch, for example, code that uses Java 7+ classes or methods just because the language level is set to 6, unless you set bootclasspath with Maven. Cross compiling is a little bit dangerous that way. But you're covered since you have at least one Java 6-only build, that is actually run with with JDK 6. On Wed, Aug 27, 2014 at 6:20 PM, Robert Metzger <[hidden email]> wrote: > @Sean: We are able to build Hadoop 1.2.1 with Java 8 because we are always > setting the compilation level to Java 6. > > I can remember two instances where we found java specific issues: > 1) The javadocs of Java 8 are checking the correctness of the HTML codes in > the source comments, so our java 8 builds failed. > 2) Oracle's Java 6 has a compiler bug that leads to null pointer exceptions > during compilations. > Both issues were discovered independent of the Hadoop version, so we should > be fine. > > I'm probably going for 5 parallel builds just because it will not add any > additional waiting time. > > Thank you for the feedback. I'll try and see how I can configure travis to > reflect these build settings: > https://issues.apache.org/jira/browse/FLINK-1072. > > -- Robert > > > On Wed, Aug 27, 2014 at 4:31 PM, Stephan Ewen <[hidden email]> wrote: > >> Sounds reasonable. >> >> Since Travis runs 5 concurrent builds, we can add one more without adding >> extra time. >> >> I would suggest to add (1.2.1 - Java 7) -> I would suspect that to be >> still used quite a bit in that combination. >> >> Stephan >> >> >> >> On Wed, Aug 27, 2014 at 12:06 PM, Sean Owen <[hidden email]> wrote: >> >> > I think the most important thing is building at least once against the >> > 4 Hadoop versions, and building at least once against the 3 JDK >> > versions. It's very unlikely that a particular JDK + Hadoop version >> > fails to compile, while the same JDK with another Hadoop, or the same >> > Hadoop with another JDK, does. >> > >> > I think you could get away with 4: >> > >> > 1.2.1 - 6 >> > 2.0.0-alpha - 6 >> > 2.2.0 - 7 >> > 2.5.0 - 8 >> > >> > These at least pairs old JDK with old Hadoop. I am not sure Hadoop < >> > 2.2 even reliably works with Java 7, for example? testing Java 8 + >> > Hadoop 1.2.1 is probably pointless, for example. >> > >> > You can add back a few more pairs here and there if this feels too >> sparse. >> > >> > >> > On Wed, Aug 27, 2014 at 10:40 AM, Robert Metzger <[hidden email]> >> > wrote: >> > > Hi guys, >> > > >> > > while creating the 0.6-incubating release I noticed that often build >> > issues >> > > are triggered by changing dependencies. >> > > In particular we allow users to set the version of the Hadoop >> dependency. >> > > >> > > Right now, we test the following variants: >> > > >> > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) >> > > >> > > Accidentially, I found out that the recently merged streaming component >> > > does not build with hadoop 2.4.0 as a dependency ( >> > > https://issues.apache.org/jira/browse/FLINK-1065). >> > > >> > > I'm suggesting to add the following versions into the pool of Hadoop >> > > versions we test against: >> > > 1) "hadoop 2.0.0-alpha" >> > > 2 "hadoop 2.5.0" >> > > >> > > 1) is going to be the replacement for the "cdh4" package, and I think >> we >> > > should test versions we are going to ship with releases. ( >> > > https://issues.apache.org/jira/browse/FLINK-1068) >> > > 2) is the current stable hadoop version. I think we should test against >> > > hadoop 2.2.0 and the latest stable hadoop version. >> > > >> > > Adding these two versions would result in 3x4 = 12 builds per push / >> pull >> > > request, which is a lot given that we can only run 5 tests in parallel. >> > > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and >> the >> > > two new hadoop versions. >> > > >> > > Opinions? >> > > >> > > >> > > -- Robert >> > >> |
Hello everyone,
This will be some kind of brainstorming question. As some of you may know I am currently working on the Python API. The most crucial part here is how the data is exchanged between Java and Python. Up to this point we used pipes for this, but switched recently to memory mapped files in hopes of increasing the (lacking) performance. Early (simplified) prototypes (outside of Flink) showed that this would yield a significant increase. yet when i added the code to flink and ran a job, there was no effect. like at all. two radically different schemes ran in /exactly/ the same time. my conclusion was that code already in place (and not part of the prototypes) is responsible for this. so i went ahead and modified the prototypes to use all relevant code from the Python API in order to narrow down the culprit. but this time, the performance increase was there. Now here's the question: How can the /very same code/ perform so much worse when integrated into flink? if the code is not the problem, what could be it? i spent a lot of time looking for that one line of code that cripples the performance, but I'm pretty much out of places to look. |
Hi Chesnay!
That is an interesting problem, though hard to judge with the information we have. Can you elaborate a bit on the following points: - When putting the objects from the Java Flink side into the shared memory, you need to serialize them. How do you do that? Into a buffer, then copy that into the shared memory ByteBuffer? Directly? - Shared memory access has to be somehow controlled. The pipes give you flow control for free (blocking write calls when the stream consumer is busy). What do you do for the shared memory? Usually, one uses semaphores, or, in java File(Range)Locks to coordinate access and block until memory regions are made available. Can you check if there are some busy waiting parts in you code? - More general: The code is slower, but does it burn CPU cycles in its slowness or is it waiting for locks / monitors / conditions ? Stephan On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < [hidden email]> wrote: > Hello everyone, > > This will be some kind of brainstorming question. > > As some of you may know I am currently working on the Python API. The most > crucial part here is how the data is exchanged between Java and Python. > Up to this point we used pipes for this, but switched recently to memory > mapped files in hopes of increasing the (lacking) performance. > > Early (simplified) prototypes (outside of Flink) showed that this would > yield a significant increase. yet when i added the code to flink and ran a > job, there was > no effect. like at all. two radically different schemes ran in /exactly/ > the same time. > > my conclusion was that code already in place (and not part of the > prototypes) is responsible for this. > so i went ahead and modified the prototypes to use all relevant code from > the Python API in order to narrow down the culprit. but this time, the > performance increase was there. > > Now here's the question: How can the /very same code/ perform so much > worse when integrated into flink? if the code is not the problem, what > could be it? > > i spent a lot of time looking for that one line of code that cripples the > performance, but I'm pretty much out of places to look. > > |
Hey Stephan,
I'd like to point out right away that the code related to your questions is shared by both programs. regarding your first point: i have a byte[] into which i serialize the data first using a ByteBuffer, and then write that data to a MappedByteBuffer. regarding synchronization: i couldn't find a way to use elaborate things like semaphores or similar that work between python and java alike. the data exchange is currently completely synchronous. java writes a record, sets an "isWritten" bit and then repeatedly checks this bit whether it is 0. python repeatedly checks this bit whether it is 1. once that happens, it reads the record, sets the bit to 0 which tells java that it has read the record and can write the next one. this scheme works the same way the other way around. *NOW,* this may seem ... inefficient, to put it slightly. it is (or rather should be...) way faster (5x) that what we had so far though (asynchronous pipes). (i also tried different schemes that all had no effect, so i decided to stick with the easiest one) on to your last point: I'm gonna check for that tomorrow. On 27.8.2014 20:45, Stephan Ewen wrote: > Hi Chesnay! > > That is an interesting problem, though hard to judge with the information > we have. > > Can you elaborate a bit on the following points: > > - When putting the objects from the Java Flink side into the shared > memory, you need to serialize them. How do you do that? Into a buffer, then > copy that into the shared memory ByteBuffer? Directly? > > - Shared memory access has to be somehow controlled. The pipes give you > flow control for free (blocking write calls when the stream consumer is > busy). What do you do for the shared memory? Usually, one uses semaphores, > or, in java File(Range)Locks to coordinate access and block until memory > regions are made available. Can you check if there are some busy waiting > parts in you code? > > - More general: The code is slower, but does it burn CPU cycles in its > slowness or is it waiting for locks / monitors / conditions ? > > Stephan > > > > On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < > [hidden email]> wrote: > >> Hello everyone, >> >> This will be some kind of brainstorming question. >> >> As some of you may know I am currently working on the Python API. The most >> crucial part here is how the data is exchanged between Java and Python. >> Up to this point we used pipes for this, but switched recently to memory >> mapped files in hopes of increasing the (lacking) performance. >> >> Early (simplified) prototypes (outside of Flink) showed that this would >> yield a significant increase. yet when i added the code to flink and ran a >> job, there was >> no effect. like at all. two radically different schemes ran in /exactly/ >> the same time. >> >> my conclusion was that code already in place (and not part of the >> prototypes) is responsible for this. >> so i went ahead and modified the prototypes to use all relevant code from >> the Python API in order to narrow down the culprit. but this time, the >> performance increase was there. >> >> Now here's the question: How can the /very same code/ perform so much >> worse when integrated into flink? if the code is not the problem, what >> could be it? >> >> i spent a lot of time looking for that one line of code that cripples the >> performance, but I'm pretty much out of places to look. >> >> |
Hey Chesnay!
Here are some thoughts: - The repeated checking for 1 or 0 is indeed a busy loop. These may behave very different in different settings. If you run the code isolated, you have a spare core for the thread and it barely hurts. Run multiple parallel instances in a larger framework, and it eats away CPU cycles from the threads that do the work - it starts hurting badly. - You may get around a copy into the shared memory (ByteBuffer into MemoryMappedFile) by creating an according DataOutputView - save one more data copy. That's the next step, though, first solve the other issue. The last time I implemented such an inter-process data pipe between languages, I had a similar issue: No support for system wide semaphores (or something similar) on both sides. I used Shared memory for the buffers, and a local network socket (UDP, but I guess TCP would be fine as well) for notifications when buffers are available. That worked pretty well, yielded high throughput, because the big buffers were not copied (unlike in streams), and the UDP notifications were very fast (fire and forget datagrams). Stephan On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < [hidden email]> wrote: > Hey Stephan, > > I'd like to point out right away that the code related to your questions > is shared by both programs. > > regarding your first point: i have a byte[] into which i serialize the > data first using a ByteBuffer, and then write that data to a > MappedByteBuffer. > > regarding synchronization: i couldn't find a way to use elaborate things > like semaphores or similar that work between python and java alike. > > the data exchange is currently completely synchronous. java writes a > record, sets an "isWritten" bit and then repeatedly checks this bit whether > it is 0. python repeatedly checks this bit whether it is 1. once that > happens, it reads the record, sets the bit to 0 which tells java that it > has read the record and can write the next one. this scheme works the same > way the other way around. > > *NOW,* this may seem ... inefficient, to put it slightly. it is (or rather > should be...) way faster (5x) that what we had so far though (asynchronous > pipes). > (i also tried different schemes that all had no effect, so i decided to > stick with the easiest one) > > on to your last point: I'm gonna check for that tomorrow. > > > > > On 27.8.2014 20:45, Stephan Ewen wrote: > >> Hi Chesnay! >> >> That is an interesting problem, though hard to judge with the information >> we have. >> >> Can you elaborate a bit on the following points: >> >> - When putting the objects from the Java Flink side into the shared >> memory, you need to serialize them. How do you do that? Into a buffer, >> then >> copy that into the shared memory ByteBuffer? Directly? >> >> - Shared memory access has to be somehow controlled. The pipes give you >> flow control for free (blocking write calls when the stream consumer is >> busy). What do you do for the shared memory? Usually, one uses semaphores, >> or, in java File(Range)Locks to coordinate access and block until memory >> regions are made available. Can you check if there are some busy waiting >> parts in you code? >> >> - More general: The code is slower, but does it burn CPU cycles in its >> slowness or is it waiting for locks / monitors / conditions ? >> >> Stephan >> >> >> >> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >> [hidden email]> wrote: >> >> Hello everyone, >>> >>> This will be some kind of brainstorming question. >>> >>> As some of you may know I am currently working on the Python API. The >>> most >>> crucial part here is how the data is exchanged between Java and Python. >>> Up to this point we used pipes for this, but switched recently to memory >>> mapped files in hopes of increasing the (lacking) performance. >>> >>> Early (simplified) prototypes (outside of Flink) showed that this would >>> yield a significant increase. yet when i added the code to flink and ran >>> a >>> job, there was >>> no effect. like at all. two radically different schemes ran in /exactly/ >>> the same time. >>> >>> my conclusion was that code already in place (and not part of the >>> prototypes) is responsible for this. >>> so i went ahead and modified the prototypes to use all relevant code from >>> the Python API in order to narrow down the culprit. but this time, the >>> performance increase was there. >>> >>> Now here's the question: How can the /very same code/ perform so much >>> worse when integrated into flink? if the code is not the problem, what >>> could be it? >>> >>> i spent a lot of time looking for that one line of code that cripples the >>> performance, but I'm pretty much out of places to look. >>> >>> >>> > |
the performance differences occur on the same system (16GB, 4 cores +
HyperThreading) with a DOP of 1 for a plan consisting of a single operator. plenty of resources :/ On 28.8.2014 0:50, Stephan Ewen wrote: > Hey Chesnay! > > Here are some thoughts: > > - The repeated checking for 1 or 0 is indeed a busy loop. These may behave > very different in different settings. If you run the code isolated, you > have a spare core for the thread and it barely hurts. Run multiple parallel > instances in a larger framework, and it eats away CPU cycles from the > threads that do the work - it starts hurting badly. > > - You may get around a copy into the shared memory (ByteBuffer into > MemoryMappedFile) by creating an according DataOutputView - save one more > data copy. That's the next step, though, first solve the other issue. > > The last time I implemented such an inter-process data pipe between > languages, I had a similar issue: No support for system wide semaphores (or > something similar) on both sides. > > I used Shared memory for the buffers, and a local network socket (UDP, but > I guess TCP would be fine as well) for notifications when buffers are > available. That worked pretty well, yielded high throughput, because the > big buffers were not copied (unlike in streams), and the UDP notifications > were very fast (fire and forget datagrams). > > Stephan > > > > On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < > [hidden email]> wrote: > >> Hey Stephan, >> >> I'd like to point out right away that the code related to your questions >> is shared by both programs. >> >> regarding your first point: i have a byte[] into which i serialize the >> data first using a ByteBuffer, and then write that data to a >> MappedByteBuffer. >> >> regarding synchronization: i couldn't find a way to use elaborate things >> like semaphores or similar that work between python and java alike. >> >> the data exchange is currently completely synchronous. java writes a >> record, sets an "isWritten" bit and then repeatedly checks this bit whether >> it is 0. python repeatedly checks this bit whether it is 1. once that >> happens, it reads the record, sets the bit to 0 which tells java that it >> has read the record and can write the next one. this scheme works the same >> way the other way around. >> >> *NOW,* this may seem ... inefficient, to put it slightly. it is (or rather >> should be...) way faster (5x) that what we had so far though (asynchronous >> pipes). >> (i also tried different schemes that all had no effect, so i decided to >> stick with the easiest one) >> >> on to your last point: I'm gonna check for that tomorrow. >> >> >> >> >> On 27.8.2014 20:45, Stephan Ewen wrote: >> >>> Hi Chesnay! >>> >>> That is an interesting problem, though hard to judge with the information >>> we have. >>> >>> Can you elaborate a bit on the following points: >>> >>> - When putting the objects from the Java Flink side into the shared >>> memory, you need to serialize them. How do you do that? Into a buffer, >>> then >>> copy that into the shared memory ByteBuffer? Directly? >>> >>> - Shared memory access has to be somehow controlled. The pipes give you >>> flow control for free (blocking write calls when the stream consumer is >>> busy). What do you do for the shared memory? Usually, one uses semaphores, >>> or, in java File(Range)Locks to coordinate access and block until memory >>> regions are made available. Can you check if there are some busy waiting >>> parts in you code? >>> >>> - More general: The code is slower, but does it burn CPU cycles in its >>> slowness or is it waiting for locks / monitors / conditions ? >>> >>> Stephan >>> >>> >>> >>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>> [hidden email]> wrote: >>> >>> Hello everyone, >>>> This will be some kind of brainstorming question. >>>> >>>> As some of you may know I am currently working on the Python API. The >>>> most >>>> crucial part here is how the data is exchanged between Java and Python. >>>> Up to this point we used pipes for this, but switched recently to memory >>>> mapped files in hopes of increasing the (lacking) performance. >>>> >>>> Early (simplified) prototypes (outside of Flink) showed that this would >>>> yield a significant increase. yet when i added the code to flink and ran >>>> a >>>> job, there was >>>> no effect. like at all. two radically different schemes ran in /exactly/ >>>> the same time. >>>> >>>> my conclusion was that code already in place (and not part of the >>>> prototypes) is responsible for this. >>>> so i went ahead and modified the prototypes to use all relevant code from >>>> the Python API in order to narrow down the culprit. but this time, the >>>> performance increase was there. >>>> >>>> Now here's the question: How can the /very same code/ perform so much >>>> worse when integrated into flink? if the code is not the problem, what >>>> could be it? >>>> >>>> i spent a lot of time looking for that one line of code that cripples the >>>> performance, but I'm pretty much out of places to look. >>>> >>>> >>>> |
In reply to this post by Sean Owen
Oh yes, you are right. I have not thought about this, but its indeed a nice
way to oversee wrong API usage. I'm going to use a JDK6 to run the hadoop2 profile, because some modules are disabled on the hadoop1 profile (hbase and yarn). This ensure that we have at least one build with all modules on an actual JDK6. I'm going to work on this issue at a later point. I've the impression that I was working a bit too much on the build / infrastructure side recently (I haven't seen much Java code in the last three days :( ) On Wed, Aug 27, 2014 at 7:58 PM, Sean Owen <[hidden email]> wrote: > Sure, if you can see a good reason to add 1-2 more, go for it. Yes you > certainly want to cover all of these Hadoop and JDK versions at least > once for the reasons you give. > > PS you won't catch, for example, code that uses Java 7+ classes or > methods just because the language level is set to 6, unless you set > bootclasspath with Maven. Cross compiling is a little bit dangerous > that way. But you're covered since you have at least one Java 6-only > build, that is actually run with with JDK 6. > > On Wed, Aug 27, 2014 at 6:20 PM, Robert Metzger <[hidden email]> > wrote: > > @Sean: We are able to build Hadoop 1.2.1 with Java 8 because we are > always > > setting the compilation level to Java 6. > > > > I can remember two instances where we found java specific issues: > > 1) The javadocs of Java 8 are checking the correctness of the HTML codes > in > > the source comments, so our java 8 builds failed. > > 2) Oracle's Java 6 has a compiler bug that leads to null pointer > exceptions > > during compilations. > > Both issues were discovered independent of the Hadoop version, so we > should > > be fine. > > > > I'm probably going for 5 parallel builds just because it will not add any > > additional waiting time. > > > > Thank you for the feedback. I'll try and see how I can configure travis > to > > reflect these build settings: > > https://issues.apache.org/jira/browse/FLINK-1072. > > > > -- Robert > > > > > > On Wed, Aug 27, 2014 at 4:31 PM, Stephan Ewen <[hidden email]> wrote: > > > >> Sounds reasonable. > >> > >> Since Travis runs 5 concurrent builds, we can add one more without > adding > >> extra time. > >> > >> I would suggest to add (1.2.1 - Java 7) -> I would suspect that to be > >> still used quite a bit in that combination. > >> > >> Stephan > >> > >> > >> > >> On Wed, Aug 27, 2014 at 12:06 PM, Sean Owen <[hidden email]> wrote: > >> > >> > I think the most important thing is building at least once against the > >> > 4 Hadoop versions, and building at least once against the 3 JDK > >> > versions. It's very unlikely that a particular JDK + Hadoop version > >> > fails to compile, while the same JDK with another Hadoop, or the same > >> > Hadoop with another JDK, does. > >> > > >> > I think you could get away with 4: > >> > > >> > 1.2.1 - 6 > >> > 2.0.0-alpha - 6 > >> > 2.2.0 - 7 > >> > 2.5.0 - 8 > >> > > >> > These at least pairs old JDK with old Hadoop. I am not sure Hadoop < > >> > 2.2 even reliably works with Java 7, for example? testing Java 8 + > >> > Hadoop 1.2.1 is probably pointless, for example. > >> > > >> > You can add back a few more pairs here and there if this feels too > >> sparse. > >> > > >> > > >> > On Wed, Aug 27, 2014 at 10:40 AM, Robert Metzger <[hidden email] > > > >> > wrote: > >> > > Hi guys, > >> > > > >> > > while creating the 0.6-incubating release I noticed that often build > >> > issues > >> > > are triggered by changing dependencies. > >> > > In particular we allow users to set the version of the Hadoop > >> dependency. > >> > > > >> > > Right now, we test the following variants: > >> > > > >> > > (oraclejdk8, oraclejdk7, openjdk6) x (hadoop 1.2.1, hadoop 2.2.0) > >> > > > >> > > Accidentially, I found out that the recently merged streaming > component > >> > > does not build with hadoop 2.4.0 as a dependency ( > >> > > https://issues.apache.org/jira/browse/FLINK-1065). > >> > > > >> > > I'm suggesting to add the following versions into the pool of Hadoop > >> > > versions we test against: > >> > > 1) "hadoop 2.0.0-alpha" > >> > > 2 "hadoop 2.5.0" > >> > > > >> > > 1) is going to be the replacement for the "cdh4" package, and I > think > >> we > >> > > should test versions we are going to ship with releases. ( > >> > > https://issues.apache.org/jira/browse/FLINK-1068) > >> > > 2) is the current stable hadoop version. I think we should test > against > >> > > hadoop 2.2.0 and the latest stable hadoop version. > >> > > > >> > > Adding these two versions would result in 3x4 = 12 builds per push / > >> pull > >> > > request, which is a lot given that we can only run 5 tests in > parallel. > >> > > Therefore, I'm suggesting to add just 2 builds with "oraclejdk8" and > >> the > >> > > two new hadoop versions. > >> > > > >> > > Opinions? > >> > > > >> > > > >> > > -- Robert > >> > > >> > |
In reply to this post by Chesnay Schepler
Hey Chesnay,
any progress on this today? Are you going for the UDP buffer availability notifications Stephan proposed instead of the busy loop? Ufuk On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < [hidden email]> wrote: > the performance differences occur on the same system (16GB, 4 cores + > HyperThreading) with a DOP of 1 for a plan consisting of a single operator. > plenty of resources :/ > > > On 28.8.2014 0:50, Stephan Ewen wrote: > >> Hey Chesnay! >> >> Here are some thoughts: >> >> - The repeated checking for 1 or 0 is indeed a busy loop. These may >> behave >> very different in different settings. If you run the code isolated, you >> have a spare core for the thread and it barely hurts. Run multiple >> parallel >> instances in a larger framework, and it eats away CPU cycles from the >> threads that do the work - it starts hurting badly. >> >> - You may get around a copy into the shared memory (ByteBuffer into >> MemoryMappedFile) by creating an according DataOutputView - save one more >> data copy. That's the next step, though, first solve the other issue. >> >> The last time I implemented such an inter-process data pipe between >> languages, I had a similar issue: No support for system wide semaphores >> (or >> something similar) on both sides. >> >> I used Shared memory for the buffers, and a local network socket (UDP, but >> I guess TCP would be fine as well) for notifications when buffers are >> available. That worked pretty well, yielded high throughput, because the >> big buffers were not copied (unlike in streams), and the UDP notifications >> were very fast (fire and forget datagrams). >> >> Stephan >> >> >> >> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >> [hidden email]> wrote: >> >> Hey Stephan, >>> >>> I'd like to point out right away that the code related to your questions >>> is shared by both programs. >>> >>> regarding your first point: i have a byte[] into which i serialize the >>> data first using a ByteBuffer, and then write that data to a >>> MappedByteBuffer. >>> >>> regarding synchronization: i couldn't find a way to use elaborate things >>> like semaphores or similar that work between python and java alike. >>> >>> the data exchange is currently completely synchronous. java writes a >>> record, sets an "isWritten" bit and then repeatedly checks this bit >>> whether >>> it is 0. python repeatedly checks this bit whether it is 1. once that >>> happens, it reads the record, sets the bit to 0 which tells java that it >>> has read the record and can write the next one. this scheme works the >>> same >>> way the other way around. >>> >>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>> rather >>> should be...) way faster (5x) that what we had so far though >>> (asynchronous >>> pipes). >>> (i also tried different schemes that all had no effect, so i decided to >>> stick with the easiest one) >>> >>> on to your last point: I'm gonna check for that tomorrow. >>> >>> >>> >>> >>> On 27.8.2014 20:45, Stephan Ewen wrote: >>> >>> Hi Chesnay! >>>> >>>> That is an interesting problem, though hard to judge with the >>>> information >>>> we have. >>>> >>>> Can you elaborate a bit on the following points: >>>> >>>> - When putting the objects from the Java Flink side into the shared >>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>> then >>>> copy that into the shared memory ByteBuffer? Directly? >>>> >>>> - Shared memory access has to be somehow controlled. The pipes give >>>> you >>>> flow control for free (blocking write calls when the stream consumer is >>>> busy). What do you do for the shared memory? Usually, one uses >>>> semaphores, >>>> or, in java File(Range)Locks to coordinate access and block until memory >>>> regions are made available. Can you check if there are some busy waiting >>>> parts in you code? >>>> >>>> - More general: The code is slower, but does it burn CPU cycles in >>>> its >>>> slowness or is it waiting for locks / monitors / conditions ? >>>> >>>> Stephan >>>> >>>> >>>> >>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>> [hidden email]> wrote: >>>> >>>> Hello everyone, >>>> >>>>> This will be some kind of brainstorming question. >>>>> >>>>> As some of you may know I am currently working on the Python API. The >>>>> most >>>>> crucial part here is how the data is exchanged between Java and Python. >>>>> Up to this point we used pipes for this, but switched recently to >>>>> memory >>>>> mapped files in hopes of increasing the (lacking) performance. >>>>> >>>>> Early (simplified) prototypes (outside of Flink) showed that this would >>>>> yield a significant increase. yet when i added the code to flink and >>>>> ran >>>>> a >>>>> job, there was >>>>> no effect. like at all. two radically different schemes ran in >>>>> /exactly/ >>>>> the same time. >>>>> >>>>> my conclusion was that code already in place (and not part of the >>>>> prototypes) is responsible for this. >>>>> so i went ahead and modified the prototypes to use all relevant code >>>>> from >>>>> the Python API in order to narrow down the culprit. but this time, the >>>>> performance increase was there. >>>>> >>>>> Now here's the question: How can the /very same code/ perform so much >>>>> worse when integrated into flink? if the code is not the problem, what >>>>> could be it? >>>>> >>>>> i spent a lot of time looking for that one line of code that cripples >>>>> the >>>>> performance, but I'm pretty much out of places to look. >>>>> >>>>> >>>>> >>>>> > |
sorry for the late answer.
today i did a quick hack to replace the synchronization completely with udp. its still synchronous and record based, but 25x slower. regarding busy-loops i would propose the following: 1. leave the python side as it is. its doing most of the heavy lifting anyway and will run at 100% regardless of the loops. (the loops only take up 5% of the total runtime) 2. once we exchange buffers instead of single records the IO operations and synchronization will take a fairly constant time. we could then put the java process to sleep manually for that time instead of waiting. it may not be as good as a blocking operation, but it should keep the cpu consumption down to some extent. On 1.9.2014 22:50, Ufuk Celebi wrote: > Hey Chesnay, > > any progress on this today? Are you going for the UDP buffer availability > notifications Stephan proposed instead of the busy loop? > > Ufuk > > > On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < > [hidden email]> wrote: > >> the performance differences occur on the same system (16GB, 4 cores + >> HyperThreading) with a DOP of 1 for a plan consisting of a single operator. >> plenty of resources :/ >> >> >> On 28.8.2014 0:50, Stephan Ewen wrote: >> >>> Hey Chesnay! >>> >>> Here are some thoughts: >>> >>> - The repeated checking for 1 or 0 is indeed a busy loop. These may >>> behave >>> very different in different settings. If you run the code isolated, you >>> have a spare core for the thread and it barely hurts. Run multiple >>> parallel >>> instances in a larger framework, and it eats away CPU cycles from the >>> threads that do the work - it starts hurting badly. >>> >>> - You may get around a copy into the shared memory (ByteBuffer into >>> MemoryMappedFile) by creating an according DataOutputView - save one more >>> data copy. That's the next step, though, first solve the other issue. >>> >>> The last time I implemented such an inter-process data pipe between >>> languages, I had a similar issue: No support for system wide semaphores >>> (or >>> something similar) on both sides. >>> >>> I used Shared memory for the buffers, and a local network socket (UDP, but >>> I guess TCP would be fine as well) for notifications when buffers are >>> available. That worked pretty well, yielded high throughput, because the >>> big buffers were not copied (unlike in streams), and the UDP notifications >>> were very fast (fire and forget datagrams). >>> >>> Stephan >>> >>> >>> >>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>> [hidden email]> wrote: >>> >>> Hey Stephan, >>>> I'd like to point out right away that the code related to your questions >>>> is shared by both programs. >>>> >>>> regarding your first point: i have a byte[] into which i serialize the >>>> data first using a ByteBuffer, and then write that data to a >>>> MappedByteBuffer. >>>> >>>> regarding synchronization: i couldn't find a way to use elaborate things >>>> like semaphores or similar that work between python and java alike. >>>> >>>> the data exchange is currently completely synchronous. java writes a >>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>> whether >>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>> happens, it reads the record, sets the bit to 0 which tells java that it >>>> has read the record and can write the next one. this scheme works the >>>> same >>>> way the other way around. >>>> >>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>> rather >>>> should be...) way faster (5x) that what we had so far though >>>> (asynchronous >>>> pipes). >>>> (i also tried different schemes that all had no effect, so i decided to >>>> stick with the easiest one) >>>> >>>> on to your last point: I'm gonna check for that tomorrow. >>>> >>>> >>>> >>>> >>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>> >>>> Hi Chesnay! >>>>> That is an interesting problem, though hard to judge with the >>>>> information >>>>> we have. >>>>> >>>>> Can you elaborate a bit on the following points: >>>>> >>>>> - When putting the objects from the Java Flink side into the shared >>>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>>> then >>>>> copy that into the shared memory ByteBuffer? Directly? >>>>> >>>>> - Shared memory access has to be somehow controlled. The pipes give >>>>> you >>>>> flow control for free (blocking write calls when the stream consumer is >>>>> busy). What do you do for the shared memory? Usually, one uses >>>>> semaphores, >>>>> or, in java File(Range)Locks to coordinate access and block until memory >>>>> regions are made available. Can you check if there are some busy waiting >>>>> parts in you code? >>>>> >>>>> - More general: The code is slower, but does it burn CPU cycles in >>>>> its >>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>> >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>> [hidden email]> wrote: >>>>> >>>>> Hello everyone, >>>>> >>>>>> This will be some kind of brainstorming question. >>>>>> >>>>>> As some of you may know I am currently working on the Python API. The >>>>>> most >>>>>> crucial part here is how the data is exchanged between Java and Python. >>>>>> Up to this point we used pipes for this, but switched recently to >>>>>> memory >>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>> >>>>>> Early (simplified) prototypes (outside of Flink) showed that this would >>>>>> yield a significant increase. yet when i added the code to flink and >>>>>> ran >>>>>> a >>>>>> job, there was >>>>>> no effect. like at all. two radically different schemes ran in >>>>>> /exactly/ >>>>>> the same time. >>>>>> >>>>>> my conclusion was that code already in place (and not part of the >>>>>> prototypes) is responsible for this. >>>>>> so i went ahead and modified the prototypes to use all relevant code >>>>>> from >>>>>> the Python API in order to narrow down the culprit. but this time, the >>>>>> performance increase was there. >>>>>> >>>>>> Now here's the question: How can the /very same code/ perform so much >>>>>> worse when integrated into flink? if the code is not the problem, what >>>>>> could be it? >>>>>> >>>>>> i spent a lot of time looking for that one line of code that cripples >>>>>> the >>>>>> performance, but I'm pretty much out of places to look. >>>>>> >>>>>> >>>>>> >>>>>> |
Hey!
The UDP version is 25x slower? That's massive. Are you sending the records through that as well, or just the coordination? Regarding busy waiting loops: There has to be a better way to do that. It will behave utterly unpredictable. Once the python side does I/O, has a separate process or thread or goes asynchronously into a library (scikitlearn, numpy), the loop cannot be expected to stay at 5%. You have tested that with a job where both java and python side have some work to do. In case of a job where one side waits for the other, the waiting side will burn cycles like crazy. Then run it in parallel (#cores) and you may get executions where little more happens then the busy waiting loop burning cycles. Stephan On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < [hidden email]> wrote: > sorry for the late answer. > > today i did a quick hack to replace the synchronization completely with > udp. its still synchronous and record based, but 25x slower. > regarding busy-loops i would propose the following: > > 1. leave the python side as it is. its doing most of the heavy lifting > anyway and will run at 100% regardless of the loops. (the loops only > take up 5% of the total runtime) > 2. once we exchange buffers instead of single records the IO operations > and synchronization will take a fairly constant time. we could then > put the java process to sleep manually for that time instead of > waiting. it may not be as good as a blocking operation, but it > should keep the cpu consumption down to some extent. > > > On 1.9.2014 22:50, Ufuk Celebi wrote: > >> Hey Chesnay, >> >> any progress on this today? Are you going for the UDP buffer availability >> notifications Stephan proposed instead of the busy loop? >> >> Ufuk >> >> >> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >> [hidden email]> wrote: >> >> the performance differences occur on the same system (16GB, 4 cores + >>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>> operator. >>> plenty of resources :/ >>> >>> >>> On 28.8.2014 0:50, Stephan Ewen wrote: >>> >>> Hey Chesnay! >>>> >>>> Here are some thoughts: >>>> >>>> - The repeated checking for 1 or 0 is indeed a busy loop. These may >>>> behave >>>> very different in different settings. If you run the code isolated, you >>>> have a spare core for the thread and it barely hurts. Run multiple >>>> parallel >>>> instances in a larger framework, and it eats away CPU cycles from the >>>> threads that do the work - it starts hurting badly. >>>> >>>> - You may get around a copy into the shared memory (ByteBuffer into >>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>> more >>>> data copy. That's the next step, though, first solve the other issue. >>>> >>>> The last time I implemented such an inter-process data pipe between >>>> languages, I had a similar issue: No support for system wide semaphores >>>> (or >>>> something similar) on both sides. >>>> >>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>> but >>>> I guess TCP would be fine as well) for notifications when buffers are >>>> available. That worked pretty well, yielded high throughput, because the >>>> big buffers were not copied (unlike in streams), and the UDP >>>> notifications >>>> were very fast (fire and forget datagrams). >>>> >>>> Stephan >>>> >>>> >>>> >>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>> [hidden email]> wrote: >>>> >>>> Hey Stephan, >>>> >>>>> I'd like to point out right away that the code related to your >>>>> questions >>>>> is shared by both programs. >>>>> >>>>> regarding your first point: i have a byte[] into which i serialize the >>>>> data first using a ByteBuffer, and then write that data to a >>>>> MappedByteBuffer. >>>>> >>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>> things >>>>> like semaphores or similar that work between python and java alike. >>>>> >>>>> the data exchange is currently completely synchronous. java writes a >>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>> whether >>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>> it >>>>> has read the record and can write the next one. this scheme works the >>>>> same >>>>> way the other way around. >>>>> >>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>> rather >>>>> should be...) way faster (5x) that what we had so far though >>>>> (asynchronous >>>>> pipes). >>>>> (i also tried different schemes that all had no effect, so i decided to >>>>> stick with the easiest one) >>>>> >>>>> on to your last point: I'm gonna check for that tomorrow. >>>>> >>>>> >>>>> >>>>> >>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>> >>>>> Hi Chesnay! >>>>> >>>>>> That is an interesting problem, though hard to judge with the >>>>>> information >>>>>> we have. >>>>>> >>>>>> Can you elaborate a bit on the following points: >>>>>> >>>>>> - When putting the objects from the Java Flink side into the >>>>>> shared >>>>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>>>> then >>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>> >>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>> give >>>>>> you >>>>>> flow control for free (blocking write calls when the stream consumer >>>>>> is >>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>> semaphores, >>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>> memory >>>>>> regions are made available. Can you check if there are some busy >>>>>> waiting >>>>>> parts in you code? >>>>>> >>>>>> - More general: The code is slower, but does it burn CPU cycles in >>>>>> its >>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>> [hidden email]> wrote: >>>>>> >>>>>> Hello everyone, >>>>>> >>>>>> This will be some kind of brainstorming question. >>>>>>> >>>>>>> As some of you may know I am currently working on the Python API. The >>>>>>> most >>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>> Python. >>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>> memory >>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>> >>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>> would >>>>>>> yield a significant increase. yet when i added the code to flink and >>>>>>> ran >>>>>>> a >>>>>>> job, there was >>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>> /exactly/ >>>>>>> the same time. >>>>>>> >>>>>>> my conclusion was that code already in place (and not part of the >>>>>>> prototypes) is responsible for this. >>>>>>> so i went ahead and modified the prototypes to use all relevant code >>>>>>> from >>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>> the >>>>>>> performance increase was there. >>>>>>> >>>>>>> Now here's the question: How can the /very same code/ perform so much >>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>> what >>>>>>> could be it? >>>>>>> >>>>>>> i spent a lot of time looking for that one line of code that cripples >>>>>>> the >>>>>>> performance, but I'm pretty much out of places to look. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> > |
only the coordination is done via UDP.
i agree with what you say about the loops; currently looking into using FileLocks. On 9.9.2014 11:33, Stephan Ewen wrote: > Hey! > > The UDP version is 25x slower? That's massive. Are you sending the records > through that as well, or just the coordination? > > Regarding busy waiting loops: There has to be a better way to do that. It > will behave utterly unpredictable. Once the python side does I/O, has a > separate process or thread or goes asynchronously into a library > (scikitlearn, numpy), the loop cannot be expected to stay at 5%. > > You have tested that with a job where both java and python side have some > work to do. In case of a job where one side waits for the other, the > waiting side will burn cycles like crazy. Then run it in parallel (#cores) > and you may get executions where little more happens then the busy waiting > loop burning cycles. > > Stephan > > > On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < > [hidden email]> wrote: > >> sorry for the late answer. >> >> today i did a quick hack to replace the synchronization completely with >> udp. its still synchronous and record based, but 25x slower. >> regarding busy-loops i would propose the following: >> >> 1. leave the python side as it is. its doing most of the heavy lifting >> anyway and will run at 100% regardless of the loops. (the loops only >> take up 5% of the total runtime) >> 2. once we exchange buffers instead of single records the IO operations >> and synchronization will take a fairly constant time. we could then >> put the java process to sleep manually for that time instead of >> waiting. it may not be as good as a blocking operation, but it >> should keep the cpu consumption down to some extent. >> >> >> On 1.9.2014 22:50, Ufuk Celebi wrote: >> >>> Hey Chesnay, >>> >>> any progress on this today? Are you going for the UDP buffer availability >>> notifications Stephan proposed instead of the busy loop? >>> >>> Ufuk >>> >>> >>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >>> [hidden email]> wrote: >>> >>> the performance differences occur on the same system (16GB, 4 cores + >>>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>>> operator. >>>> plenty of resources :/ >>>> >>>> >>>> On 28.8.2014 0:50, Stephan Ewen wrote: >>>> >>>> Hey Chesnay! >>>>> Here are some thoughts: >>>>> >>>>> - The repeated checking for 1 or 0 is indeed a busy loop. These may >>>>> behave >>>>> very different in different settings. If you run the code isolated, you >>>>> have a spare core for the thread and it barely hurts. Run multiple >>>>> parallel >>>>> instances in a larger framework, and it eats away CPU cycles from the >>>>> threads that do the work - it starts hurting badly. >>>>> >>>>> - You may get around a copy into the shared memory (ByteBuffer into >>>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>>> more >>>>> data copy. That's the next step, though, first solve the other issue. >>>>> >>>>> The last time I implemented such an inter-process data pipe between >>>>> languages, I had a similar issue: No support for system wide semaphores >>>>> (or >>>>> something similar) on both sides. >>>>> >>>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>>> but >>>>> I guess TCP would be fine as well) for notifications when buffers are >>>>> available. That worked pretty well, yielded high throughput, because the >>>>> big buffers were not copied (unlike in streams), and the UDP >>>>> notifications >>>>> were very fast (fire and forget datagrams). >>>>> >>>>> Stephan >>>>> >>>>> >>>>> >>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>>> [hidden email]> wrote: >>>>> >>>>> Hey Stephan, >>>>> >>>>>> I'd like to point out right away that the code related to your >>>>>> questions >>>>>> is shared by both programs. >>>>>> >>>>>> regarding your first point: i have a byte[] into which i serialize the >>>>>> data first using a ByteBuffer, and then write that data to a >>>>>> MappedByteBuffer. >>>>>> >>>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>>> things >>>>>> like semaphores or similar that work between python and java alike. >>>>>> >>>>>> the data exchange is currently completely synchronous. java writes a >>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>>> whether >>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>>> it >>>>>> has read the record and can write the next one. this scheme works the >>>>>> same >>>>>> way the other way around. >>>>>> >>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>>> rather >>>>>> should be...) way faster (5x) that what we had so far though >>>>>> (asynchronous >>>>>> pipes). >>>>>> (i also tried different schemes that all had no effect, so i decided to >>>>>> stick with the easiest one) >>>>>> >>>>>> on to your last point: I'm gonna check for that tomorrow. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>>> >>>>>> Hi Chesnay! >>>>>> >>>>>>> That is an interesting problem, though hard to judge with the >>>>>>> information >>>>>>> we have. >>>>>>> >>>>>>> Can you elaborate a bit on the following points: >>>>>>> >>>>>>> - When putting the objects from the Java Flink side into the >>>>>>> shared >>>>>>> memory, you need to serialize them. How do you do that? Into a buffer, >>>>>>> then >>>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>>> >>>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>>> give >>>>>>> you >>>>>>> flow control for free (blocking write calls when the stream consumer >>>>>>> is >>>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>>> semaphores, >>>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>>> memory >>>>>>> regions are made available. Can you check if there are some busy >>>>>>> waiting >>>>>>> parts in you code? >>>>>>> >>>>>>> - More general: The code is slower, but does it burn CPU cycles in >>>>>>> its >>>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>>> >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>>> [hidden email]> wrote: >>>>>>> >>>>>>> Hello everyone, >>>>>>> >>>>>>> This will be some kind of brainstorming question. >>>>>>>> As some of you may know I am currently working on the Python API. The >>>>>>>> most >>>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>>> Python. >>>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>>> memory >>>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>>> >>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>>> would >>>>>>>> yield a significant increase. yet when i added the code to flink and >>>>>>>> ran >>>>>>>> a >>>>>>>> job, there was >>>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>>> /exactly/ >>>>>>>> the same time. >>>>>>>> >>>>>>>> my conclusion was that code already in place (and not part of the >>>>>>>> prototypes) is responsible for this. >>>>>>>> so i went ahead and modified the prototypes to use all relevant code >>>>>>>> from >>>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>>> the >>>>>>>> performance increase was there. >>>>>>>> >>>>>>>> Now here's the question: How can the /very same code/ perform so much >>>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>>> what >>>>>>>> could be it? >>>>>>>> >>>>>>>> i spent a lot of time looking for that one line of code that cripples >>>>>>>> the >>>>>>>> performance, but I'm pretty much out of places to look. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> |
Maybe there is some quirk in the way you use the datagrams. Have you tried
it through TCP sockets? On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler < [hidden email]> wrote: > only the coordination is done via UDP. > > i agree with what you say about the loops; currently looking into using > FileLocks. > > > On 9.9.2014 11:33, Stephan Ewen wrote: > >> Hey! >> >> The UDP version is 25x slower? That's massive. Are you sending the records >> through that as well, or just the coordination? >> >> Regarding busy waiting loops: There has to be a better way to do that. It >> will behave utterly unpredictable. Once the python side does I/O, has a >> separate process or thread or goes asynchronously into a library >> (scikitlearn, numpy), the loop cannot be expected to stay at 5%. >> >> You have tested that with a job where both java and python side have some >> work to do. In case of a job where one side waits for the other, the >> waiting side will burn cycles like crazy. Then run it in parallel (#cores) >> and you may get executions where little more happens then the busy waiting >> loop burning cycles. >> >> Stephan >> >> >> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < >> [hidden email]> wrote: >> >> sorry for the late answer. >>> >>> today i did a quick hack to replace the synchronization completely with >>> udp. its still synchronous and record based, but 25x slower. >>> regarding busy-loops i would propose the following: >>> >>> 1. leave the python side as it is. its doing most of the heavy lifting >>> anyway and will run at 100% regardless of the loops. (the loops only >>> take up 5% of the total runtime) >>> 2. once we exchange buffers instead of single records the IO operations >>> and synchronization will take a fairly constant time. we could then >>> put the java process to sleep manually for that time instead of >>> waiting. it may not be as good as a blocking operation, but it >>> should keep the cpu consumption down to some extent. >>> >>> >>> On 1.9.2014 22:50, Ufuk Celebi wrote: >>> >>> Hey Chesnay, >>>> >>>> any progress on this today? Are you going for the UDP buffer >>>> availability >>>> notifications Stephan proposed instead of the busy loop? >>>> >>>> Ufuk >>>> >>>> >>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >>>> [hidden email]> wrote: >>>> >>>> the performance differences occur on the same system (16GB, 4 cores + >>>> >>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>>>> operator. >>>>> plenty of resources :/ >>>>> >>>>> >>>>> On 28.8.2014 0:50, Stephan Ewen wrote: >>>>> >>>>> Hey Chesnay! >>>>> >>>>>> Here are some thoughts: >>>>>> >>>>>> - The repeated checking for 1 or 0 is indeed a busy loop. These >>>>>> may >>>>>> behave >>>>>> very different in different settings. If you run the code isolated, >>>>>> you >>>>>> have a spare core for the thread and it barely hurts. Run multiple >>>>>> parallel >>>>>> instances in a larger framework, and it eats away CPU cycles from the >>>>>> threads that do the work - it starts hurting badly. >>>>>> >>>>>> - You may get around a copy into the shared memory (ByteBuffer >>>>>> into >>>>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>>>> more >>>>>> data copy. That's the next step, though, first solve the other issue. >>>>>> >>>>>> The last time I implemented such an inter-process data pipe between >>>>>> languages, I had a similar issue: No support for system wide >>>>>> semaphores >>>>>> (or >>>>>> something similar) on both sides. >>>>>> >>>>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>>>> but >>>>>> I guess TCP would be fine as well) for notifications when buffers are >>>>>> available. That worked pretty well, yielded high throughput, because >>>>>> the >>>>>> big buffers were not copied (unlike in streams), and the UDP >>>>>> notifications >>>>>> were very fast (fire and forget datagrams). >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>>>> [hidden email]> wrote: >>>>>> >>>>>> Hey Stephan, >>>>>> >>>>>> I'd like to point out right away that the code related to your >>>>>>> questions >>>>>>> is shared by both programs. >>>>>>> >>>>>>> regarding your first point: i have a byte[] into which i serialize >>>>>>> the >>>>>>> data first using a ByteBuffer, and then write that data to a >>>>>>> MappedByteBuffer. >>>>>>> >>>>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>>>> things >>>>>>> like semaphores or similar that work between python and java alike. >>>>>>> >>>>>>> the data exchange is currently completely synchronous. java writes a >>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>>>> whether >>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>>>> it >>>>>>> has read the record and can write the next one. this scheme works the >>>>>>> same >>>>>>> way the other way around. >>>>>>> >>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>>>> rather >>>>>>> should be...) way faster (5x) that what we had so far though >>>>>>> (asynchronous >>>>>>> pipes). >>>>>>> (i also tried different schemes that all had no effect, so i decided >>>>>>> to >>>>>>> stick with the easiest one) >>>>>>> >>>>>>> on to your last point: I'm gonna check for that tomorrow. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>>>> >>>>>>> Hi Chesnay! >>>>>>> >>>>>>> That is an interesting problem, though hard to judge with the >>>>>>>> information >>>>>>>> we have. >>>>>>>> >>>>>>>> Can you elaborate a bit on the following points: >>>>>>>> >>>>>>>> - When putting the objects from the Java Flink side into the >>>>>>>> shared >>>>>>>> memory, you need to serialize them. How do you do that? Into a >>>>>>>> buffer, >>>>>>>> then >>>>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>>>> >>>>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>>>> give >>>>>>>> you >>>>>>>> flow control for free (blocking write calls when the stream consumer >>>>>>>> is >>>>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>>>> semaphores, >>>>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>>>> memory >>>>>>>> regions are made available. Can you check if there are some busy >>>>>>>> waiting >>>>>>>> parts in you code? >>>>>>>> >>>>>>>> - More general: The code is slower, but does it burn CPU >>>>>>>> cycles in >>>>>>>> its >>>>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>>>> >>>>>>>> Stephan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>>>> [hidden email]> wrote: >>>>>>>> >>>>>>>> Hello everyone, >>>>>>>> >>>>>>>> This will be some kind of brainstorming question. >>>>>>>> >>>>>>>>> As some of you may know I am currently working on the Python API. >>>>>>>>> The >>>>>>>>> most >>>>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>>>> Python. >>>>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>>>> memory >>>>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>>>> >>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>>>> would >>>>>>>>> yield a significant increase. yet when i added the code to flink >>>>>>>>> and >>>>>>>>> ran >>>>>>>>> a >>>>>>>>> job, there was >>>>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>>>> /exactly/ >>>>>>>>> the same time. >>>>>>>>> >>>>>>>>> my conclusion was that code already in place (and not part of the >>>>>>>>> prototypes) is responsible for this. >>>>>>>>> so i went ahead and modified the prototypes to use all relevant >>>>>>>>> code >>>>>>>>> from >>>>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>>>> the >>>>>>>>> performance increase was there. >>>>>>>>> >>>>>>>>> Now here's the question: How can the /very same code/ perform so >>>>>>>>> much >>>>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>>>> what >>>>>>>>> could be it? >>>>>>>>> >>>>>>>>> i spent a lot of time looking for that one line of code that >>>>>>>>> cripples >>>>>>>>> the >>>>>>>>> performance, but I'm pretty much out of places to look. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> > |
havent tried tcp.
all i do is create a socket and use send/receive operations as some kind of semaphore. i dont even access the contents of the datagram. On 10.9.2014 15:17, Stephan Ewen wrote: > Maybe there is some quirk in the way you use the datagrams. Have you tried > it through TCP sockets? > > On Wed, Sep 10, 2014 at 2:30 PM, Chesnay Schepler < > [hidden email]> wrote: > >> only the coordination is done via UDP. >> >> i agree with what you say about the loops; currently looking into using >> FileLocks. >> >> >> On 9.9.2014 11:33, Stephan Ewen wrote: >> >>> Hey! >>> >>> The UDP version is 25x slower? That's massive. Are you sending the records >>> through that as well, or just the coordination? >>> >>> Regarding busy waiting loops: There has to be a better way to do that. It >>> will behave utterly unpredictable. Once the python side does I/O, has a >>> separate process or thread or goes asynchronously into a library >>> (scikitlearn, numpy), the loop cannot be expected to stay at 5%. >>> >>> You have tested that with a job where both java and python side have some >>> work to do. In case of a job where one side waits for the other, the >>> waiting side will burn cycles like crazy. Then run it in parallel (#cores) >>> and you may get executions where little more happens then the busy waiting >>> loop burning cycles. >>> >>> Stephan >>> >>> >>> On Mon, Sep 8, 2014 at 4:15 PM, Chesnay Schepler < >>> [hidden email]> wrote: >>> >>> sorry for the late answer. >>>> today i did a quick hack to replace the synchronization completely with >>>> udp. its still synchronous and record based, but 25x slower. >>>> regarding busy-loops i would propose the following: >>>> >>>> 1. leave the python side as it is. its doing most of the heavy lifting >>>> anyway and will run at 100% regardless of the loops. (the loops only >>>> take up 5% of the total runtime) >>>> 2. once we exchange buffers instead of single records the IO operations >>>> and synchronization will take a fairly constant time. we could then >>>> put the java process to sleep manually for that time instead of >>>> waiting. it may not be as good as a blocking operation, but it >>>> should keep the cpu consumption down to some extent. >>>> >>>> >>>> On 1.9.2014 22:50, Ufuk Celebi wrote: >>>> >>>> Hey Chesnay, >>>>> any progress on this today? Are you going for the UDP buffer >>>>> availability >>>>> notifications Stephan proposed instead of the busy loop? >>>>> >>>>> Ufuk >>>>> >>>>> >>>>> On Thu, Aug 28, 2014 at 1:06 AM, Chesnay Schepler < >>>>> [hidden email]> wrote: >>>>> >>>>> the performance differences occur on the same system (16GB, 4 cores + >>>>> >>>>>> HyperThreading) with a DOP of 1 for a plan consisting of a single >>>>>> operator. >>>>>> plenty of resources :/ >>>>>> >>>>>> >>>>>> On 28.8.2014 0:50, Stephan Ewen wrote: >>>>>> >>>>>> Hey Chesnay! >>>>>> >>>>>>> Here are some thoughts: >>>>>>> >>>>>>> - The repeated checking for 1 or 0 is indeed a busy loop. These >>>>>>> may >>>>>>> behave >>>>>>> very different in different settings. If you run the code isolated, >>>>>>> you >>>>>>> have a spare core for the thread and it barely hurts. Run multiple >>>>>>> parallel >>>>>>> instances in a larger framework, and it eats away CPU cycles from the >>>>>>> threads that do the work - it starts hurting badly. >>>>>>> >>>>>>> - You may get around a copy into the shared memory (ByteBuffer >>>>>>> into >>>>>>> MemoryMappedFile) by creating an according DataOutputView - save one >>>>>>> more >>>>>>> data copy. That's the next step, though, first solve the other issue. >>>>>>> >>>>>>> The last time I implemented such an inter-process data pipe between >>>>>>> languages, I had a similar issue: No support for system wide >>>>>>> semaphores >>>>>>> (or >>>>>>> something similar) on both sides. >>>>>>> >>>>>>> I used Shared memory for the buffers, and a local network socket (UDP, >>>>>>> but >>>>>>> I guess TCP would be fine as well) for notifications when buffers are >>>>>>> available. That worked pretty well, yielded high throughput, because >>>>>>> the >>>>>>> big buffers were not copied (unlike in streams), and the UDP >>>>>>> notifications >>>>>>> were very fast (fire and forget datagrams). >>>>>>> >>>>>>> Stephan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 27, 2014 at 10:48 PM, Chesnay Schepler < >>>>>>> [hidden email]> wrote: >>>>>>> >>>>>>> Hey Stephan, >>>>>>> >>>>>>> I'd like to point out right away that the code related to your >>>>>>>> questions >>>>>>>> is shared by both programs. >>>>>>>> >>>>>>>> regarding your first point: i have a byte[] into which i serialize >>>>>>>> the >>>>>>>> data first using a ByteBuffer, and then write that data to a >>>>>>>> MappedByteBuffer. >>>>>>>> >>>>>>>> regarding synchronization: i couldn't find a way to use elaborate >>>>>>>> things >>>>>>>> like semaphores or similar that work between python and java alike. >>>>>>>> >>>>>>>> the data exchange is currently completely synchronous. java writes a >>>>>>>> record, sets an "isWritten" bit and then repeatedly checks this bit >>>>>>>> whether >>>>>>>> it is 0. python repeatedly checks this bit whether it is 1. once that >>>>>>>> happens, it reads the record, sets the bit to 0 which tells java that >>>>>>>> it >>>>>>>> has read the record and can write the next one. this scheme works the >>>>>>>> same >>>>>>>> way the other way around. >>>>>>>> >>>>>>>> *NOW,* this may seem ... inefficient, to put it slightly. it is (or >>>>>>>> rather >>>>>>>> should be...) way faster (5x) that what we had so far though >>>>>>>> (asynchronous >>>>>>>> pipes). >>>>>>>> (i also tried different schemes that all had no effect, so i decided >>>>>>>> to >>>>>>>> stick with the easiest one) >>>>>>>> >>>>>>>> on to your last point: I'm gonna check for that tomorrow. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 27.8.2014 20:45, Stephan Ewen wrote: >>>>>>>> >>>>>>>> Hi Chesnay! >>>>>>>> >>>>>>>> That is an interesting problem, though hard to judge with the >>>>>>>>> information >>>>>>>>> we have. >>>>>>>>> >>>>>>>>> Can you elaborate a bit on the following points: >>>>>>>>> >>>>>>>>> - When putting the objects from the Java Flink side into the >>>>>>>>> shared >>>>>>>>> memory, you need to serialize them. How do you do that? Into a >>>>>>>>> buffer, >>>>>>>>> then >>>>>>>>> copy that into the shared memory ByteBuffer? Directly? >>>>>>>>> >>>>>>>>> - Shared memory access has to be somehow controlled. The pipes >>>>>>>>> give >>>>>>>>> you >>>>>>>>> flow control for free (blocking write calls when the stream consumer >>>>>>>>> is >>>>>>>>> busy). What do you do for the shared memory? Usually, one uses >>>>>>>>> semaphores, >>>>>>>>> or, in java File(Range)Locks to coordinate access and block until >>>>>>>>> memory >>>>>>>>> regions are made available. Can you check if there are some busy >>>>>>>>> waiting >>>>>>>>> parts in you code? >>>>>>>>> >>>>>>>>> - More general: The code is slower, but does it burn CPU >>>>>>>>> cycles in >>>>>>>>> its >>>>>>>>> slowness or is it waiting for locks / monitors / conditions ? >>>>>>>>> >>>>>>>>> Stephan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 27, 2014 at 8:34 PM, Chesnay Schepler < >>>>>>>>> [hidden email]> wrote: >>>>>>>>> >>>>>>>>> Hello everyone, >>>>>>>>> >>>>>>>>> This will be some kind of brainstorming question. >>>>>>>>> >>>>>>>>>> As some of you may know I am currently working on the Python API. >>>>>>>>>> The >>>>>>>>>> most >>>>>>>>>> crucial part here is how the data is exchanged between Java and >>>>>>>>>> Python. >>>>>>>>>> Up to this point we used pipes for this, but switched recently to >>>>>>>>>> memory >>>>>>>>>> mapped files in hopes of increasing the (lacking) performance. >>>>>>>>>> >>>>>>>>>> Early (simplified) prototypes (outside of Flink) showed that this >>>>>>>>>> would >>>>>>>>>> yield a significant increase. yet when i added the code to flink >>>>>>>>>> and >>>>>>>>>> ran >>>>>>>>>> a >>>>>>>>>> job, there was >>>>>>>>>> no effect. like at all. two radically different schemes ran in >>>>>>>>>> /exactly/ >>>>>>>>>> the same time. >>>>>>>>>> >>>>>>>>>> my conclusion was that code already in place (and not part of the >>>>>>>>>> prototypes) is responsible for this. >>>>>>>>>> so i went ahead and modified the prototypes to use all relevant >>>>>>>>>> code >>>>>>>>>> from >>>>>>>>>> the Python API in order to narrow down the culprit. but this time, >>>>>>>>>> the >>>>>>>>>> performance increase was there. >>>>>>>>>> >>>>>>>>>> Now here's the question: How can the /very same code/ perform so >>>>>>>>>> much >>>>>>>>>> worse when integrated into flink? if the code is not the problem, >>>>>>>>>> what >>>>>>>>>> could be it? >>>>>>>>>> >>>>>>>>>> i spent a lot of time looking for that one line of code that >>>>>>>>>> cripples >>>>>>>>>> the >>>>>>>>>> performance, but I'm pretty much out of places to look. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> |
Free forum by Nabble | Edit this page |