Hello all,
I am working on a program where the solution set type is Tuple2<Long, Tuple2<Long, Double>>. At some point, I need to join with the solution set on the first field of the nested tuple (the Double), but I am getting an error that "the solution set may only be joined with using tuple field positions". Since no other operations apart from join and cogroup with the solution set are allowed, I cannot "flatten" the data before joining. Any suggestions on a solution? What is the reason KeySelectors are not supported here? Thanks! Vasia. |
Hi Vasia!
There is no fundamental reason, we simply have not gotten around to implementing it, yet. Any help along these lines is highly welcome. One reason that held us back is that we need to make sure that the key of the solution set and the key of the join is the same. That is hard to verify with general functions. One approach is to actually change the delta iteration API to define the keys only at one place (the definition of the iteration), and offer special "joinWithSolution" and "coGroupWithSolution" functions, rather then using the regular join syntax (which allows to create invalid constructs). What are your thoughts on this, from a DeltaIteration user perspective? Greetings, Stephan |
Hey,
thanks for replying so fast :) I saw the discussion in a previous thread concerning changing the API to offer more explicit join functions. I think providing these special functions is a good way to disable any other kind of interaction with the solution set. However, as a user, I would like to have a more flexible way of interacting with the state of the iteration. In the program I'm describing above, I actually want to join on the value of the solution set, not the key. It would be nice to somehow have access to the solution set as any other normal DataSet. I'm not sure how this could be supported, but if you think this is a good idea, I could work on this! Cheers, V. On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: > Hi Vasia! > > There is no fundamental reason, we simply have not gotten around to > implementing it, yet. Any help along these lines is highly welcome. > > One reason that held us back is that we need to make sure that the key of > the solution set and the key of the join is the same. > That is hard to verify with general functions. One approach is to actually > change the delta iteration API to define the keys only at > one place (the definition of the iteration), and offer special > "joinWithSolution" and "coGroupWithSolution" functions, rather then using > the regular join syntax (which allows to create invalid constructs). > > What are your thoughts on this, from a DeltaIteration user perspective? > > Greetings, > Stephan > |
Hey!
Sorry for the late reply, but here are some thoughts in that direction: Stateful operations: In order to keep state (in the form of a hash map or so) around in iterations, you need not use the restricted means that the delta iterations give you. All function instances stay around for all supersteps, so they can actually just create a map inside that function and update it as you like. There is some example code at the end of the mail (listing 1) or nicely formatted) in this gist: https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 State across functions: If you actually want to have state that you access from multiple functions, that is also possible. Use a static broker to fetch the map, like in listing 2 or this gist: https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 Making sure data is partitioned: The more tricky part is making sure that the data is partitioned when it enters the functions. Right now, this needs a hack: A GroupReduce function that simply emits everything. This will cause an unnecessary and costly sort, so we need to get that out of the way. Until then, it should allow you to write a prototype. The good news is that we are planning to add hints to tell the system to partition/rebalance/etc data sets for operations. This would solve the issue nicely. It is not a terribly large change, so it should not take too long. I'll keep you posted on the progress... Greetings, Stephan --------------------------------------------------------- Sample 1: Flexible State in a Mapper --------------------------------------------------------- public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { private final Map<Long, Edge> state = new HashMap<Long, Edge>(); @Override public void open(Configuration conf) throws Exception { // load the state if (getIterationRuntimeContext().getSuperstepNumber() == 1) { String pathToFragment = "hdfs://... or file://..."; CsvInputFormat<Edge> reader = new CsvInputFormat<>(new Path(pathToFragment)); reader.configure(new Configuration()); reader.open(new FileInputSplit(0, new Path(pathToFragment), 0, 36584, null)); while (!reader.reachedEnd()) { Edge next = reader.nextRecord(new Edge()); state.put(next.f0, next); } } } @Override public void close() { // check whether to write the state out if (getIterationRuntimeContext().getSuperstepNumber() == 42) { // write the state (similar code as in open() for the reader) } } @Override public void mapPartition(Iterable<Edge> records, Collector<Edge> out) throws Exception { // do something with the state for (Edge e : records) { Edge other = state.get(e.f0); // do something state.put(...); } } } ======================================================= --------------------------------------------------------- Sample 2: Sharing state across functions --------------------------------------------------------- public class StateBroker { public static final ConcurrentHashMap<Integer, Map<Long, Edge>> BROKER = new ConcurrentHashMap<>(); public static Map<Long, Edge> getForSubtask(int subtask) { Map<Long, Edge> entry = BROKER.get(subtask); if (entry == null) { entry = new HashMap<>(); Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, entry); entry = previous == null ? entry : previous; } return entry; } } public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { private Map<Long, Edge> state; @Override public void open(Configuration conf) throws Exception { // load the state if (getIterationRuntimeContext().getSuperstepNumber() == 1) { state = StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); } } ... } On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri <[hidden email] > wrote: > Hey, > > thanks for replying so fast :) > > I saw the discussion in a previous thread concerning changing the API to > offer more explicit join functions. > I think providing these special functions is a good way to disable any > other kind of interaction with the solution set. > > However, as a user, I would like to have a more flexible way of interacting > with the state of the iteration. > In the program I'm describing above, I actually want to join on the value > of the solution set, not the key. > It would be nice to somehow have access to the solution set as any other > normal DataSet. > > I'm not sure how this could be supported, but if you think this is a good > idea, I could work on this! > > Cheers, > V. > > > > On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: > > > Hi Vasia! > > > > There is no fundamental reason, we simply have not gotten around to > > implementing it, yet. Any help along these lines is highly welcome. > > > > One reason that held us back is that we need to make sure that the key of > > the solution set and the key of the join is the same. > > That is hard to verify with general functions. One approach is to > actually > > change the delta iteration API to define the keys only at > > one place (the definition of the iteration), and offer special > > "joinWithSolution" and "coGroupWithSolution" functions, rather then using > > the regular join syntax (which allows to create invalid constructs). > > > > What are your thoughts on this, from a DeltaIteration user perspective? > > > > Greetings, > > Stephan > > > |
Addendum: The issue to enforce partitioning of the data set is tracked
here: https://issues.apache.org/jira/browse/FLINK-1060 On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <[hidden email]> wrote: > Hey! > > Sorry for the late reply, but here are some thoughts in that direction: > > > Stateful operations: > > In order to keep state (in the form of a hash map or so) around in > iterations, you need not use the restricted means that the delta iterations > give you. All function instances stay around for all supersteps, so they > can actually just create a map inside that function and update it as you > like. > > There is some example code at the end of the mail (listing 1) or nicely > formatted) in this gist: > https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > > > State across functions: > > If you actually want to have state that you access from multiple > functions, that is also possible. Use a static broker to fetch the map, > like in listing 2 or this gist: > https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > > > Making sure data is partitioned: > > The more tricky part is making sure that the data is partitioned when it > enters the functions. Right now, this needs a hack: A GroupReduce function > that simply emits everything. This will cause an unnecessary and costly > sort, so we need to get that out of the way. Until then, it should allow > you to write a prototype. > > The good news is that we are planning to add hints to tell the system to > partition/rebalance/etc data sets for operations. This would solve the > issue nicely. It is not a terribly large change, so it should not take too > long. I'll keep you posted on the progress... > > Greetings, > Stephan > > > --------------------------------------------------------- > Sample 1: Flexible State in a Mapper > --------------------------------------------------------- > > public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { > > private final Map<Long, Edge> state = new HashMap<Long, Edge>(); > > @Override > public void open(Configuration conf) throws Exception { > // load the state > > if (getIterationRuntimeContext().getSuperstepNumber() == 1) { > String pathToFragment = "hdfs://... or file://..."; > CsvInputFormat<Edge> reader = new CsvInputFormat<>(new Path(pathToFragment)); > reader.configure(new Configuration()); > reader.open(new FileInputSplit(0, new Path(pathToFragment), 0, 36584, null)); > while (!reader.reachedEnd()) { > Edge next = reader.nextRecord(new Edge()); > state.put(next.f0, next); > } > } > > } > > @Override > > public void close() { > // check whether to write the state out > if (getIterationRuntimeContext().getSuperstepNumber() == 42) { > // write the state (similar code as in open() for the reader) > > } > } > > > @Override > public void mapPartition(Iterable<Edge> records, Collector<Edge> out) throws Exception { > // do something with the state > for (Edge e : records) { > Edge other = state.get(e.f0); > // do something > > state.put(...); > > } > } > } > > > ======================================================= > > --------------------------------------------------------- > Sample 2: Sharing state across functions > --------------------------------------------------------- > > public class StateBroker { > > public static final ConcurrentHashMap<Integer, Map<Long, Edge>> BROKER = new ConcurrentHashMap<>(); > public static Map<Long, Edge> getForSubtask(int subtask) { > Map<Long, Edge> entry = BROKER.get(subtask); > if (entry == null) { > entry = new HashMap<>(); > Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, entry); > entry = previous == null ? entry : previous; > } > return entry; > } > } > > > public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { > > private Map<Long, Edge> state; > > @Override > public void open(Configuration conf) throws Exception { > // load the state > > if (getIterationRuntimeContext().getSuperstepNumber() == 1) { > state = StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > } > } > > ... > } > > > > On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > [hidden email]> wrote: > >> Hey, >> >> thanks for replying so fast :) >> >> I saw the discussion in a previous thread concerning changing the API to >> offer more explicit join functions. >> I think providing these special functions is a good way to disable any >> other kind of interaction with the solution set. >> >> However, as a user, I would like to have a more flexible way of >> interacting >> with the state of the iteration. >> In the program I'm describing above, I actually want to join on the value >> of the solution set, not the key. >> It would be nice to somehow have access to the solution set as any other >> normal DataSet. >> >> I'm not sure how this could be supported, but if you think this is a good >> idea, I could work on this! >> >> Cheers, >> V. >> >> >> >> On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: >> >> > Hi Vasia! >> > >> > There is no fundamental reason, we simply have not gotten around to >> > implementing it, yet. Any help along these lines is highly welcome. >> > >> > One reason that held us back is that we need to make sure that the key >> of >> > the solution set and the key of the join is the same. >> > That is hard to verify with general functions. One approach is to >> actually >> > change the delta iteration API to define the keys only at >> > one place (the definition of the iteration), and offer special >> > "joinWithSolution" and "coGroupWithSolution" functions, rather then >> using >> > the regular join syntax (which allows to create invalid constructs). >> > >> > What are your thoughts on this, from a DeltaIteration user perspective? >> > >> > Greetings, >> > Stephan >> > >> > > |
HI Guys,
sorry for being so late in this discussion but I was thinking about how we could make iterations more intuitive. My Idea would be to make access to the solution set explicit in Delta Iterations. It should be usable as input for any operation and updates to it would be done through a special construct, not by joining with it. Maybe we could have a special UpdateSolution operator which would make sure that the data is partitioned correctly for the distributed hash table in which the solution is stored. Or we could have a function on the Iteration Context that can update the solution from an arbitraty operator. In my idea, Bulk Iteration would be the base operation and the other Iterations would add special features to it. I also would like to add an iteration type where you can output an element in every iteration and the result is the concatenation of those elements. This seems to be required for some machine-learning-style algorithms. What do you guys think about this? I'm not sure on the details here since I'm busy with other low-level stuff (Generalizing Pair Comparators and the Scala API) but I was planning to tackle this afterwards. If someone else wanted to look into this I'd be happy to help and discuss, though. :D Cheers, Aljoscha On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <[hidden email]> wrote: > Addendum: The issue to enforce partitioning of the data set is tracked > here: https://issues.apache.org/jira/browse/FLINK-1060 > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <[hidden email]> wrote: > >> Hey! >> >> Sorry for the late reply, but here are some thoughts in that direction: >> >> >> Stateful operations: >> >> In order to keep state (in the form of a hash map or so) around in >> iterations, you need not use the restricted means that the delta iterations >> give you. All function instances stay around for all supersteps, so they >> can actually just create a map inside that function and update it as you >> like. >> >> There is some example code at the end of the mail (listing 1) or nicely >> formatted) in this gist: >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 >> >> >> State across functions: >> >> If you actually want to have state that you access from multiple >> functions, that is also possible. Use a static broker to fetch the map, >> like in listing 2 or this gist: >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 >> >> >> Making sure data is partitioned: >> >> The more tricky part is making sure that the data is partitioned when it >> enters the functions. Right now, this needs a hack: A GroupReduce function >> that simply emits everything. This will cause an unnecessary and costly >> sort, so we need to get that out of the way. Until then, it should allow >> you to write a prototype. >> >> The good news is that we are planning to add hints to tell the system to >> partition/rebalance/etc data sets for operations. This would solve the >> issue nicely. It is not a terribly large change, so it should not take too >> long. I'll keep you posted on the progress... >> >> Greetings, >> Stephan >> >> >> --------------------------------------------------------- >> Sample 1: Flexible State in a Mapper >> --------------------------------------------------------- >> >> public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { >> >> private final Map<Long, Edge> state = new HashMap<Long, Edge>(); >> >> @Override >> public void open(Configuration conf) throws Exception { >> // load the state >> >> if (getIterationRuntimeContext().getSuperstepNumber() == 1) { >> String pathToFragment = "hdfs://... or file://..."; >> CsvInputFormat<Edge> reader = new CsvInputFormat<>(new Path(pathToFragment)); >> reader.configure(new Configuration()); >> reader.open(new FileInputSplit(0, new Path(pathToFragment), 0, 36584, null)); >> while (!reader.reachedEnd()) { >> Edge next = reader.nextRecord(new Edge()); >> state.put(next.f0, next); >> } >> } >> >> } >> >> @Override >> >> public void close() { >> // check whether to write the state out >> if (getIterationRuntimeContext().getSuperstepNumber() == 42) { >> // write the state (similar code as in open() for the reader) >> >> } >> } >> >> >> @Override >> public void mapPartition(Iterable<Edge> records, Collector<Edge> out) throws Exception { >> // do something with the state >> for (Edge e : records) { >> Edge other = state.get(e.f0); >> // do something >> >> state.put(...); >> >> } >> } >> } >> >> >> ======================================================= >> >> --------------------------------------------------------- >> Sample 2: Sharing state across functions >> --------------------------------------------------------- >> >> public class StateBroker { >> >> public static final ConcurrentHashMap<Integer, Map<Long, Edge>> BROKER = new ConcurrentHashMap<>(); >> public static Map<Long, Edge> getForSubtask(int subtask) { >> Map<Long, Edge> entry = BROKER.get(subtask); >> if (entry == null) { >> entry = new HashMap<>(); >> Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, entry); >> entry = previous == null ? entry : previous; >> } >> return entry; >> } >> } >> >> >> public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { >> >> private Map<Long, Edge> state; >> >> @Override >> public void open(Configuration conf) throws Exception { >> // load the state >> >> if (getIterationRuntimeContext().getSuperstepNumber() == 1) { >> state = StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); >> } >> } >> >> ... >> } >> >> >> >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < >> [hidden email]> wrote: >> >>> Hey, >>> >>> thanks for replying so fast :) >>> >>> I saw the discussion in a previous thread concerning changing the API to >>> offer more explicit join functions. >>> I think providing these special functions is a good way to disable any >>> other kind of interaction with the solution set. >>> >>> However, as a user, I would like to have a more flexible way of >>> interacting >>> with the state of the iteration. >>> In the program I'm describing above, I actually want to join on the value >>> of the solution set, not the key. >>> It would be nice to somehow have access to the solution set as any other >>> normal DataSet. >>> >>> I'm not sure how this could be supported, but if you think this is a good >>> idea, I could work on this! >>> >>> Cheers, >>> V. >>> >>> >>> >>> On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: >>> >>> > Hi Vasia! >>> > >>> > There is no fundamental reason, we simply have not gotten around to >>> > implementing it, yet. Any help along these lines is highly welcome. >>> > >>> > One reason that held us back is that we need to make sure that the key >>> of >>> > the solution set and the key of the join is the same. >>> > That is hard to verify with general functions. One approach is to >>> actually >>> > change the delta iteration API to define the keys only at >>> > one place (the definition of the iteration), and offer special >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather then >>> using >>> > the regular join syntax (which allows to create invalid constructs). >>> > >>> > What are your thoughts on this, from a DeltaIteration user perspective? >>> > >>> > Greetings, >>> > Stephan >>> > >>> >> >> |
Hello all,
thank you Stephan and Aljoscha for the great input! Regarding my initial issue in this e-mail, I have solved it a while ago, by (1) flattening the input before the join and (2) realizing that there was an easier way to implement my algorithm than what I was trying to do o.O However, several interesting discussion points came out of this, so let me try to summarize: (1) Support joining the solution set, using key selectors. This is the initial problem that started this discussion. As Stephan explains above, a possible solution can be providing explicit functions "joinWithSolution" and "coGroupWithSolution" to make sure the keys used are valid. (2) Allow operations, other than join and cogroup, on the solution set. For this one, I would like to first understand whether restricting the operations to join and cogroup was a requirement for some reason (like disallowing writing invalid constructs) or whether you did not see the use case for it. The solution here can be using special functions or constructs, like Aljoscha suggests. (3) Support types of iterations where the result does not have to be in a "solution set" form. An example is the one Aljoscha gives with outputing one element per iteration and concatenating them. I like this idea, but I think this requires a bit more thought and should be something more general. Do you have any other related issues to add? Personally, I'm interested in all of the above and I believe I can dedicate some time to work on them :) So, what do you think? Cheers, V. On 31 August 2014 08:54, Aljoscha Krettek <[hidden email]> wrote: > HI Guys, > sorry for being so late in this discussion but I was thinking about > how we could make iterations more intuitive. My Idea would be to make > access to the solution set explicit in Delta Iterations. It should be > usable as input for any operation and updates to it would be done > through a special construct, not by joining with it. Maybe we could > have a special UpdateSolution operator which would make sure that the > data is partitioned correctly for the distributed hash table in which > the solution is stored. Or we could have a function on the Iteration > Context that can update the solution from an arbitraty operator. In my > idea, Bulk Iteration would be the base operation and the other > Iterations would add special features to it. I also would like to add > an iteration type where you can output an element in every iteration > and the result is the concatenation of those elements. This seems to > be required for some machine-learning-style algorithms. > > What do you guys think about this? I'm not sure on the details here > since I'm busy with other low-level stuff (Generalizing Pair > Comparators and the Scala API) but I was planning to tackle this > afterwards. If someone else wanted to look into this I'd be happy to > help and discuss, though. :D > > Cheers, > Aljoscha > > On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <[hidden email]> wrote: > > Addendum: The issue to enforce partitioning of the data set is tracked > > here: https://issues.apache.org/jira/browse/FLINK-1060 > > > > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <[hidden email]> wrote: > > > >> Hey! > >> > >> Sorry for the late reply, but here are some thoughts in that direction: > >> > >> > >> Stateful operations: > >> > >> In order to keep state (in the form of a hash map or so) around in > >> iterations, you need not use the restricted means that the delta > iterations > >> give you. All function instances stay around for all supersteps, so they > >> can actually just create a map inside that function and update it as you > >> like. > >> > >> There is some example code at the end of the mail (listing 1) or nicely > >> formatted) in this gist: > >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > >> > >> > >> State across functions: > >> > >> If you actually want to have state that you access from multiple > >> functions, that is also possible. Use a static broker to fetch the map, > >> like in listing 2 or this gist: > >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > >> > >> > >> Making sure data is partitioned: > >> > >> The more tricky part is making sure that the data is partitioned when it > >> enters the functions. Right now, this needs a hack: A GroupReduce > function > >> that simply emits everything. This will cause an unnecessary and costly > >> sort, so we need to get that out of the way. Until then, it should allow > >> you to write a prototype. > >> > >> The good news is that we are planning to add hints to tell the system to > >> partition/rebalance/etc data sets for operations. This would solve the > >> issue nicely. It is not a terribly large change, so it should not take > too > >> long. I'll keep you posted on the progress... > >> > >> Greetings, > >> Stephan > >> > >> > >> --------------------------------------------------------- > >> Sample 1: Flexible State in a Mapper > >> --------------------------------------------------------- > >> > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > Edge> { > >> > >> private final Map<Long, Edge> state = new HashMap<Long, Edge>(); > >> > >> @Override > >> public void open(Configuration conf) throws Exception { > >> // load the state > >> > >> if (getIterationRuntimeContext().getSuperstepNumber() == > 1) { > >> String pathToFragment = > "hdfs://... or file://..."; > >> CsvInputFormat<Edge> reader = new > CsvInputFormat<>(new Path(pathToFragment)); > >> reader.configure(new Configuration()); > >> reader.open(new FileInputSplit(0, new > Path(pathToFragment), 0, 36584, null)); > >> while > (!reader.reachedEnd()) { > >> Edge next = reader.nextRecord(new Edge()); > >> state.put(next.f0, next); > >> } > >> } > >> > >> } > >> > >> @Override > >> > >> public void close() { > >> // check whether to write the state out > >> if (getIterationRuntimeContext().getSuperstepNumber() == > 42) { > >> // write the state (similar code as in open() for > the reader) > >> > >> } > >> } > >> > >> > >> @Override > >> public void mapPartition(Iterable<Edge> records, Collector<Edge> > out) throws Exception { > >> // do something with the state > >> for (Edge e : records) { > >> Edge other = state.get(e.f0); > >> // do something > >> > >> state.put(...); > >> > >> } > >> } > >> } > >> > >> > >> ======================================================= > >> > >> --------------------------------------------------------- > >> Sample 2: Sharing state across functions > >> --------------------------------------------------------- > >> > >> public class StateBroker { > >> > >> public static final ConcurrentHashMap<Integer, Map<Long, Edge>> > BROKER = new ConcurrentHashMap<>(); > >> public static Map<Long, Edge> getForSubtask(int subtask) { > >> Map<Long, Edge> entry = > BROKER.get(subtask); > >> if (entry == null) { > >> entry = new HashMap<>(); > >> Map<Long, Edge> previous = > BROKER.putIfAbsent(subtask, entry); > >> entry = previous == null ? entry : previous; > >> } > >> return entry; > >> } > >> } > >> > >> > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > Edge> { > >> > >> private Map<Long, Edge> state; > >> > >> @Override > >> public void open(Configuration conf) throws Exception { > >> // load the state > >> > >> if (getIterationRuntimeContext().getSuperstepNumber() == > 1) { > >> state = > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > >> } > >> } > >> > >> ... > >> } > >> > >> > >> > >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > >> [hidden email]> wrote: > >> > >>> Hey, > >>> > >>> thanks for replying so fast :) > >>> > >>> I saw the discussion in a previous thread concerning changing the API > to > >>> offer more explicit join functions. > >>> I think providing these special functions is a good way to disable any > >>> other kind of interaction with the solution set. > >>> > >>> However, as a user, I would like to have a more flexible way of > >>> interacting > >>> with the state of the iteration. > >>> In the program I'm describing above, I actually want to join on the > value > >>> of the solution set, not the key. > >>> It would be nice to somehow have access to the solution set as any > other > >>> normal DataSet. > >>> > >>> I'm not sure how this could be supported, but if you think this is a > good > >>> idea, I could work on this! > >>> > >>> Cheers, > >>> V. > >>> > >>> > >>> > >>> On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: > >>> > >>> > Hi Vasia! > >>> > > >>> > There is no fundamental reason, we simply have not gotten around to > >>> > implementing it, yet. Any help along these lines is highly welcome. > >>> > > >>> > One reason that held us back is that we need to make sure that the > key > >>> of > >>> > the solution set and the key of the join is the same. > >>> > That is hard to verify with general functions. One approach is to > >>> actually > >>> > change the delta iteration API to define the keys only at > >>> > one place (the definition of the iteration), and offer special > >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather then > >>> using > >>> > the regular join syntax (which allows to create invalid constructs). > >>> > > >>> > What are your thoughts on this, from a DeltaIteration user > perspective? > >>> > > >>> > Greetings, > >>> > Stephan > >>> > > >>> > >> > >> > |
Hi Vasia,
thanks for the nice wrap-up :-) All of these would be very valuable enhancements of the system, IMO. I'd suggest you create JIRAs for the individual points. It would be very cool, if you'd pick up some of the tasks. :-) Cheers, Fabian 2014-09-04 20:43 GMT+02:00 Vasiliki Kalavri <[hidden email]>: > Hello all, > > thank you Stephan and Aljoscha for the great input! > > Regarding my initial issue in this e-mail, I have solved it a while ago, by > (1) flattening the input before the join and (2) realizing that there was > an easier way to implement my algorithm than what I was trying to do o.O > > However, several interesting discussion points came out of this, so let me > try to summarize: > > (1) Support joining the solution set, using key selectors. > This is the initial problem that started this discussion. As Stephan > explains above, a possible solution can be providing explicit functions > "joinWithSolution" and "coGroupWithSolution" to make sure the keys used are > valid. > > (2) Allow operations, other than join and cogroup, on the solution set. > For this one, I would like to first understand whether restricting the > operations to join and cogroup was a requirement for some reason (like > disallowing writing invalid constructs) or whether you did not see the use > case for it. The solution here can be using special functions or > constructs, like Aljoscha suggests. > > (3) Support types of iterations where the result does not have to be in a > "solution set" form. An example is the one Aljoscha gives with outputing > one element per iteration and concatenating them. I like this idea, but I > think this requires a bit more thought and should be something more > general. > > Do you have any other related issues to add? > > Personally, I'm interested in all of the above and I believe I can dedicate > some time to work on them :) > So, what do you think? > > Cheers, > V. > > > On 31 August 2014 08:54, Aljoscha Krettek <[hidden email]> wrote: > > > HI Guys, > > sorry for being so late in this discussion but I was thinking about > > how we could make iterations more intuitive. My Idea would be to make > > access to the solution set explicit in Delta Iterations. It should be > > usable as input for any operation and updates to it would be done > > through a special construct, not by joining with it. Maybe we could > > have a special UpdateSolution operator which would make sure that the > > data is partitioned correctly for the distributed hash table in which > > the solution is stored. Or we could have a function on the Iteration > > Context that can update the solution from an arbitraty operator. In my > > idea, Bulk Iteration would be the base operation and the other > > Iterations would add special features to it. I also would like to add > > an iteration type where you can output an element in every iteration > > and the result is the concatenation of those elements. This seems to > > be required for some machine-learning-style algorithms. > > > > What do you guys think about this? I'm not sure on the details here > > since I'm busy with other low-level stuff (Generalizing Pair > > Comparators and the Scala API) but I was planning to tackle this > > afterwards. If someone else wanted to look into this I'd be happy to > > help and discuss, though. :D > > > > Cheers, > > Aljoscha > > > > On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <[hidden email]> wrote: > > > Addendum: The issue to enforce partitioning of the data set is tracked > > > here: https://issues.apache.org/jira/browse/FLINK-1060 > > > > > > > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <[hidden email]> > wrote: > > > > > >> Hey! > > >> > > >> Sorry for the late reply, but here are some thoughts in that > direction: > > >> > > >> > > >> Stateful operations: > > >> > > >> In order to keep state (in the form of a hash map or so) around in > > >> iterations, you need not use the restricted means that the delta > > iterations > > >> give you. All function instances stay around for all supersteps, so > they > > >> can actually just create a map inside that function and update it as > you > > >> like. > > >> > > >> There is some example code at the end of the mail (listing 1) or > nicely > > >> formatted) in this gist: > > >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > > >> > > >> > > >> State across functions: > > >> > > >> If you actually want to have state that you access from multiple > > >> functions, that is also possible. Use a static broker to fetch the > map, > > >> like in listing 2 or this gist: > > >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > > >> > > >> > > >> Making sure data is partitioned: > > >> > > >> The more tricky part is making sure that the data is partitioned when > it > > >> enters the functions. Right now, this needs a hack: A GroupReduce > > function > > >> that simply emits everything. This will cause an unnecessary and > costly > > >> sort, so we need to get that out of the way. Until then, it should > allow > > >> you to write a prototype. > > >> > > >> The good news is that we are planning to add hints to tell the system > to > > >> partition/rebalance/etc data sets for operations. This would solve the > > >> issue nicely. It is not a terribly large change, so it should not take > > too > > >> long. I'll keep you posted on the progress... > > >> > > >> Greetings, > > >> Stephan > > >> > > >> > > >> --------------------------------------------------------- > > >> Sample 1: Flexible State in a Mapper > > >> --------------------------------------------------------- > > >> > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > Edge> { > > >> > > >> private final Map<Long, Edge> state = new HashMap<Long, Edge>(); > > >> > > >> @Override > > >> public void open(Configuration conf) throws Exception { > > >> // load the state > > >> > > >> if (getIterationRuntimeContext().getSuperstepNumber() == > > 1) { > > >> String pathToFragment = > > "hdfs://... or file://..."; > > >> CsvInputFormat<Edge> reader = new > > CsvInputFormat<>(new Path(pathToFragment)); > > >> reader.configure(new Configuration()); > > >> reader.open(new FileInputSplit(0, new > > Path(pathToFragment), 0, 36584, null)); > > >> while > > (!reader.reachedEnd()) { > > >> Edge next = reader.nextRecord(new > Edge()); > > >> state.put(next.f0, next); > > >> } > > >> } > > >> > > >> } > > >> > > >> @Override > > >> > > >> public void close() { > > >> // check whether to write the state out > > >> if (getIterationRuntimeContext().getSuperstepNumber() == > > 42) { > > >> // write the state (similar code as in open() > for > > the reader) > > >> > > >> } > > >> } > > >> > > >> > > >> @Override > > >> public void mapPartition(Iterable<Edge> records, Collector<Edge> > > out) throws Exception { > > >> // do something with the state > > >> for (Edge e : records) { > > >> Edge other = state.get(e.f0); > > >> // do something > > >> > > >> state.put(...); > > >> > > >> } > > >> } > > >> } > > >> > > >> > > >> ======================================================= > > >> > > >> --------------------------------------------------------- > > >> Sample 2: Sharing state across functions > > >> --------------------------------------------------------- > > >> > > >> public class StateBroker { > > >> > > >> public static final ConcurrentHashMap<Integer, Map<Long, Edge>> > > BROKER = new ConcurrentHashMap<>(); > > >> public static Map<Long, Edge> getForSubtask(int > subtask) { > > >> Map<Long, Edge> entry = > > BROKER.get(subtask); > > >> if (entry == null) { > > >> entry = new HashMap<>(); > > >> Map<Long, Edge> previous = > > BROKER.putIfAbsent(subtask, entry); > > >> entry = previous == null ? entry : previous; > > >> } > > >> return entry; > > >> } > > >> } > > >> > > >> > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > Edge> { > > >> > > >> private Map<Long, Edge> state; > > >> > > >> @Override > > >> public void open(Configuration conf) throws Exception { > > >> // load the state > > >> > > >> if (getIterationRuntimeContext().getSuperstepNumber() == > > 1) { > > >> state = > > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > > >> } > > >> } > > >> > > >> ... > > >> } > > >> > > >> > > >> > > >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > > >> [hidden email]> wrote: > > >> > > >>> Hey, > > >>> > > >>> thanks for replying so fast :) > > >>> > > >>> I saw the discussion in a previous thread concerning changing the API > > to > > >>> offer more explicit join functions. > > >>> I think providing these special functions is a good way to disable > any > > >>> other kind of interaction with the solution set. > > >>> > > >>> However, as a user, I would like to have a more flexible way of > > >>> interacting > > >>> with the state of the iteration. > > >>> In the program I'm describing above, I actually want to join on the > > value > > >>> of the solution set, not the key. > > >>> It would be nice to somehow have access to the solution set as any > > other > > >>> normal DataSet. > > >>> > > >>> I'm not sure how this could be supported, but if you think this is a > > good > > >>> idea, I could work on this! > > >>> > > >>> Cheers, > > >>> V. > > >>> > > >>> > > >>> > > >>> On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: > > >>> > > >>> > Hi Vasia! > > >>> > > > >>> > There is no fundamental reason, we simply have not gotten around to > > >>> > implementing it, yet. Any help along these lines is highly welcome. > > >>> > > > >>> > One reason that held us back is that we need to make sure that the > > key > > >>> of > > >>> > the solution set and the key of the join is the same. > > >>> > That is hard to verify with general functions. One approach is to > > >>> actually > > >>> > change the delta iteration API to define the keys only at > > >>> > one place (the definition of the iteration), and offer special > > >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather then > > >>> using > > >>> > the regular join syntax (which allows to create invalid > constructs). > > >>> > > > >>> > What are your thoughts on this, from a DeltaIteration user > > perspective? > > >>> > > > >>> > Greetings, > > >>> > Stephan > > >>> > > > >>> > > >> > > >> > > > |
Hi,
I've created issues for (1) <https://issues.apache.org/jira/browse/FLINK-1091> and (2) <https://issues.apache.org/jira/browse/FLINK-1092>. Regarding (3), I'm not quite sure how it should be done or whether it should be part of a more general iteration type, so I'd like to think about it a bit more. Any thoughts and comments will be much appreciated :) Cheers, V. On 4 September 2014 21:07, Fabian Hueske <[hidden email]> wrote: > Hi Vasia, > > thanks for the nice wrap-up :-) > All of these would be very valuable enhancements of the system, IMO. > > I'd suggest you create JIRAs for the individual points. > It would be very cool, if you'd pick up some of the tasks. :-) > > Cheers, Fabian > > > 2014-09-04 20:43 GMT+02:00 Vasiliki Kalavri <[hidden email]>: > > > Hello all, > > > > thank you Stephan and Aljoscha for the great input! > > > > Regarding my initial issue in this e-mail, I have solved it a while ago, > by > > (1) flattening the input before the join and (2) realizing that there was > > an easier way to implement my algorithm than what I was trying to do o.O > > > > However, several interesting discussion points came out of this, so let > me > > try to summarize: > > > > (1) Support joining the solution set, using key selectors. > > This is the initial problem that started this discussion. As Stephan > > explains above, a possible solution can be providing explicit functions > > "joinWithSolution" and "coGroupWithSolution" to make sure the keys used > are > > valid. > > > > (2) Allow operations, other than join and cogroup, on the solution set. > > For this one, I would like to first understand whether restricting the > > operations to join and cogroup was a requirement for some reason (like > > disallowing writing invalid constructs) or whether you did not see the > use > > case for it. The solution here can be using special functions or > > constructs, like Aljoscha suggests. > > > > (3) Support types of iterations where the result does not have to be in a > > "solution set" form. An example is the one Aljoscha gives with outputing > > one element per iteration and concatenating them. I like this idea, but I > > think this requires a bit more thought and should be something more > > general. > > > > Do you have any other related issues to add? > > > > Personally, I'm interested in all of the above and I believe I can > dedicate > > some time to work on them :) > > So, what do you think? > > > > Cheers, > > V. > > > > > > On 31 August 2014 08:54, Aljoscha Krettek <[hidden email]> wrote: > > > > > HI Guys, > > > sorry for being so late in this discussion but I was thinking about > > > how we could make iterations more intuitive. My Idea would be to make > > > access to the solution set explicit in Delta Iterations. It should be > > > usable as input for any operation and updates to it would be done > > > through a special construct, not by joining with it. Maybe we could > > > have a special UpdateSolution operator which would make sure that the > > > data is partitioned correctly for the distributed hash table in which > > > the solution is stored. Or we could have a function on the Iteration > > > Context that can update the solution from an arbitraty operator. In my > > > idea, Bulk Iteration would be the base operation and the other > > > Iterations would add special features to it. I also would like to add > > > an iteration type where you can output an element in every iteration > > > and the result is the concatenation of those elements. This seems to > > > be required for some machine-learning-style algorithms. > > > > > > What do you guys think about this? I'm not sure on the details here > > > since I'm busy with other low-level stuff (Generalizing Pair > > > Comparators and the Scala API) but I was planning to tackle this > > > afterwards. If someone else wanted to look into this I'd be happy to > > > help and discuss, though. :D > > > > > > Cheers, > > > Aljoscha > > > > > > On Sat, Aug 30, 2014 at 6:27 PM, Stephan Ewen <[hidden email]> > wrote: > > > > Addendum: The issue to enforce partitioning of the data set is > tracked > > > > here: https://issues.apache.org/jira/browse/FLINK-1060 > > > > > > > > > > > > On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <[hidden email]> > > wrote: > > > > > > > >> Hey! > > > >> > > > >> Sorry for the late reply, but here are some thoughts in that > > direction: > > > >> > > > >> > > > >> Stateful operations: > > > >> > > > >> In order to keep state (in the form of a hash map or so) around in > > > >> iterations, you need not use the restricted means that the delta > > > iterations > > > >> give you. All function instances stay around for all supersteps, so > > they > > > >> can actually just create a map inside that function and update it as > > you > > > >> like. > > > >> > > > >> There is some example code at the end of the mail (listing 1) or > > nicely > > > >> formatted) in this gist: > > > >> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > > > >> > > > >> > > > >> State across functions: > > > >> > > > >> If you actually want to have state that you access from multiple > > > >> functions, that is also possible. Use a static broker to fetch the > > map, > > > >> like in listing 2 or this gist: > > > >> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > > > >> > > > >> > > > >> Making sure data is partitioned: > > > >> > > > >> The more tricky part is making sure that the data is partitioned > when > > it > > > >> enters the functions. Right now, this needs a hack: A GroupReduce > > > function > > > >> that simply emits everything. This will cause an unnecessary and > > costly > > > >> sort, so we need to get that out of the way. Until then, it should > > allow > > > >> you to write a prototype. > > > >> > > > >> The good news is that we are planning to add hints to tell the > system > > to > > > >> partition/rebalance/etc data sets for operations. This would solve > the > > > >> issue nicely. It is not a terribly large change, so it should not > take > > > too > > > >> long. I'll keep you posted on the progress... > > > >> > > > >> Greetings, > > > >> Stephan > > > >> > > > >> > > > >> --------------------------------------------------------- > > > >> Sample 1: Flexible State in a Mapper > > > >> --------------------------------------------------------- > > > >> > > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > > Edge> { > > > >> > > > >> private final Map<Long, Edge> state = new HashMap<Long, > Edge>(); > > > >> > > > >> @Override > > > >> public void open(Configuration conf) throws Exception { > > > >> // load the state > > > >> > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 1) { > > > >> String pathToFragment > = > > > "hdfs://... or file://..."; > > > >> CsvInputFormat<Edge> reader = new > > > CsvInputFormat<>(new Path(pathToFragment)); > > > >> reader.configure(new Configuration()); > > > >> reader.open(new FileInputSplit(0, new > > > Path(pathToFragment), 0, 36584, null)); > > > >> while > > > (!reader.reachedEnd()) { > > > >> Edge next = reader.nextRecord(new > > Edge()); > > > >> state.put(next.f0, next); > > > >> } > > > >> } > > > >> > > > >> } > > > >> > > > >> @Override > > > >> > > > >> public void close() { > > > >> // check whether to write the state out > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 42) { > > > >> // write the state (similar code as in open() > > for > > > the reader) > > > >> > > > >> } > > > >> } > > > >> > > > >> > > > >> @Override > > > >> public void mapPartition(Iterable<Edge> records, > Collector<Edge> > > > out) throws Exception { > > > >> // do something with the state > > > >> for (Edge e : records) { > > > >> Edge other = state.get(e.f0); > > > >> // do something > > > >> > > > >> state.put(...); > > > >> > > > >> } > > > >> } > > > >> } > > > >> > > > >> > > > >> ======================================================= > > > >> > > > >> --------------------------------------------------------- > > > >> Sample 2: Sharing state across functions > > > >> --------------------------------------------------------- > > > >> > > > >> public class StateBroker { > > > >> > > > >> public static final ConcurrentHashMap<Integer, Map<Long, Edge>> > > > BROKER = new ConcurrentHashMap<>(); > > > >> public static Map<Long, Edge> getForSubtask(int > > subtask) { > > > >> Map<Long, Edge> entry = > > > BROKER.get(subtask); > > > >> if (entry == null) { > > > >> entry = new HashMap<>(); > > > >> Map<Long, Edge> previous = > > > BROKER.putIfAbsent(subtask, entry); > > > >> entry = previous == null ? entry : previous; > > > >> } > > > >> return entry; > > > >> } > > > >> } > > > >> > > > >> > > > >> public static class MyMapper extends RichMapPartitionFunction<Edge, > > > Edge> { > > > >> > > > >> private Map<Long, Edge> state; > > > >> > > > >> @Override > > > >> public void open(Configuration conf) throws Exception { > > > >> // load the state > > > >> > > > >> if (getIterationRuntimeContext().getSuperstepNumber() > == > > > 1) { > > > >> state = > > > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > > > >> } > > > >> } > > > >> > > > >> ... > > > >> } > > > >> > > > >> > > > >> > > > >> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > > > >> [hidden email]> wrote: > > > >> > > > >>> Hey, > > > >>> > > > >>> thanks for replying so fast :) > > > >>> > > > >>> I saw the discussion in a previous thread concerning changing the > API > > > to > > > >>> offer more explicit join functions. > > > >>> I think providing these special functions is a good way to disable > > any > > > >>> other kind of interaction with the solution set. > > > >>> > > > >>> However, as a user, I would like to have a more flexible way of > > > >>> interacting > > > >>> with the state of the iteration. > > > >>> In the program I'm describing above, I actually want to join on the > > > value > > > >>> of the solution set, not the key. > > > >>> It would be nice to somehow have access to the solution set as any > > > other > > > >>> normal DataSet. > > > >>> > > > >>> I'm not sure how this could be supported, but if you think this is > a > > > good > > > >>> idea, I could work on this! > > > >>> > > > >>> Cheers, > > > >>> V. > > > >>> > > > >>> > > > >>> > > > >>> On 31 July 2014 15:50, Stephan Ewen <[hidden email]> wrote: > > > >>> > > > >>> > Hi Vasia! > > > >>> > > > > >>> > There is no fundamental reason, we simply have not gotten around > to > > > >>> > implementing it, yet. Any help along these lines is highly > welcome. > > > >>> > > > > >>> > One reason that held us back is that we need to make sure that > the > > > key > > > >>> of > > > >>> > the solution set and the key of the join is the same. > > > >>> > That is hard to verify with general functions. One approach is to > > > >>> actually > > > >>> > change the delta iteration API to define the keys only at > > > >>> > one place (the definition of the iteration), and offer special > > > >>> > "joinWithSolution" and "coGroupWithSolution" functions, rather > then > > > >>> using > > > >>> > the regular join syntax (which allows to create invalid > > constructs). > > > >>> > > > > >>> > What are your thoughts on this, from a DeltaIteration user > > > perspective? > > > >>> > > > > >>> > Greetings, > > > >>> > Stephan > > > >>> > > > > >>> > > > >> > > > >> > > > > > > |
Free forum by Nabble | Edit this page |