Hey guys!
I've been thinking about this one today: Say you have a stream of data in the form of (id, value) - This will evidently be a DataStream of Tuple2. I need to cache this data in some sort of static stream (perhaps even a DataSet). Then, if in the input stream, I see an id that was previously stored, I should update its value with the most recent entry. On an example: 1, 3 2, 5 6, 7 1, 5 The value cached for the id 1 should be 5. How would you recommend caching the data? And what would be used for the update? A join function? As far as I see things, you cannot really combine DataSets with DataStreams although a DataSet is, in essence, just a finite stream. If this can indeed be done, some pseudocode would be nice :) Thanks! Andra |
Hi Andra,
What you thought of turns out to be one of the core features of the Flink streaming API. Flink's operators support state. State can be partitioned by the the key using keyBy(field). You may use a MapFunction to achieve what you wanted like so: public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(new Tuple2<>(1L, 3L), new Tuple2<>(2L, 5L), new Tuple2<>(6L, 7L), new Tuple2<>(1L, 5L)) .keyBy(0) .map(new StatefulMapper()) .print(); env.execute(); } The output is the following on my machine (discarded the output of the print): Key: 2 Previous state was: -1 Update state to: 5 Key: 1 Previous state was: -1 Update state to: 3 Key: 6 Previous state was: -1 Update state to: 7 Key: 1 Previous state was: 3 Update state to: 5 Cheers, Max On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <[hidden email]> wrote: > Hey guys! > > I've been thinking about this one today: > > Say you have a stream of data in the form of (id, value) - This will > evidently be a DataStream of Tuple2. > I need to cache this data in some sort of static stream (perhaps even a > DataSet). > Then, if in the input stream, I see an id that was previously stored, I > should update its value with the most recent entry. > > On an example: > > 1, 3 > 2, 5 > 6, 7 > 1, 5 > > The value cached for the id 1 should be 5. > > How would you recommend caching the data? And what would be used for the > update? A join function? > > As far as I see things, you cannot really combine DataSets with DataStreams > although a DataSet is, in essence, just a finite stream. > If this can indeed be done, some pseudocode would be nice :) > > Thanks! > Andra > |
Oups, forgot the mapper :)
static class StatefulMapper extends RichMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private OperatorState<Long> counter; @Override public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws Exception { System.out.println("Key: " + value.f0 + " Previous state was: "+ counter.value() + " Update state to: "+ value.f1); counter.update(value.f1); return value; } @Override public void open(Configuration config) { counter = getRuntimeContext().getKeyValueState("mystate", Long.class, -1L); } } On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <[hidden email]> wrote: > Hi Andra, > > What you thought of turns out to be one of the core features of the Flink > streaming API. Flink's operators support state. State can be partitioned by > the the key using keyBy(field). > > You may use a MapFunction to achieve what you wanted like so: > > public static void main(String[] args) throws Exception { > > final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); > > env.fromElements(new Tuple2<>(1L, 3L), > new Tuple2<>(2L, 5L), > new Tuple2<>(6L, 7L), > new Tuple2<>(1L, 5L)) > > .keyBy(0) > > .map(new StatefulMapper()) > > .print(); > > env.execute(); > > } > > The output is the following on my machine (discarded the output of the > print): > > Key: 2 Previous state was: -1 Update state to: 5 > Key: 1 Previous state was: -1 Update state to: 3 > Key: 6 Previous state was: -1 Update state to: 7 > Key: 1 Previous state was: 3 Update state to: 5 > > > Cheers, > Max > > > > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <[hidden email]> > wrote: > >> Hey guys! >> >> I've been thinking about this one today: >> >> Say you have a stream of data in the form of (id, value) - This will >> evidently be a DataStream of Tuple2. >> I need to cache this data in some sort of static stream (perhaps even a >> DataSet). >> Then, if in the input stream, I see an id that was previously stored, I >> should update its value with the most recent entry. >> >> On an example: >> >> 1, 3 >> 2, 5 >> 6, 7 >> 1, 5 >> >> The value cached for the id 1 should be 5. >> >> How would you recommend caching the data? And what would be used for the >> update? A join function? >> >> As far as I see things, you cannot really combine DataSets with >> DataStreams >> although a DataSet is, in essence, just a finite stream. >> If this can indeed be done, some pseudocode would be nice :) >> >> Thanks! >> Andra >> > > |
Thanks Max ^^
On Wed, Oct 28, 2015 at 8:41 PM, Maximilian Michels <[hidden email]> wrote: > Oups, forgot the mapper :) > > static class StatefulMapper extends RichMapFunction<Tuple2<Long, > Long>, Tuple2<Long, Long>> { > > private OperatorState<Long> counter; > > @Override > public Tuple2<Long, Long> map(Tuple2<Long, Long> value) throws > Exception { > System.out.println("Key: " + value.f0 + > " Previous state was: "+ counter.value() + > " Update state to: "+ value.f1); > counter.update(value.f1); > return value; > } > > @Override > public void open(Configuration config) { > counter = getRuntimeContext().getKeyValueState("mystate", > Long.class, -1L); > } > } > > > > On Wed, Oct 28, 2015 at 7:39 PM, Maximilian Michels <[hidden email]> > wrote: > > > Hi Andra, > > > > What you thought of turns out to be one of the core features of the Flink > > streaming API. Flink's operators support state. State can be partitioned > by > > the the key using keyBy(field). > > > > You may use a MapFunction to achieve what you wanted like so: > > > > public static void main(String[] args) throws Exception { > > > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.fromElements(new Tuple2<>(1L, 3L), > > new Tuple2<>(2L, 5L), > > new Tuple2<>(6L, 7L), > > new Tuple2<>(1L, 5L)) > > > > .keyBy(0) > > > > .map(new StatefulMapper()) > > > > .print(); > > > > env.execute(); > > > > } > > > > The output is the following on my machine (discarded the output of the > > print): > > > > Key: 2 Previous state was: -1 Update state to: 5 > > Key: 1 Previous state was: -1 Update state to: 3 > > Key: 6 Previous state was: -1 Update state to: 7 > > Key: 1 Previous state was: 3 Update state to: 5 > > > > > > Cheers, > > Max > > > > > > > > On Wed, Oct 28, 2015 at 4:30 PM, Andra Lungu <[hidden email]> > > wrote: > > > >> Hey guys! > >> > >> I've been thinking about this one today: > >> > >> Say you have a stream of data in the form of (id, value) - This will > >> evidently be a DataStream of Tuple2. > >> I need to cache this data in some sort of static stream (perhaps even a > >> DataSet). > >> Then, if in the input stream, I see an id that was previously stored, I > >> should update its value with the most recent entry. > >> > >> On an example: > >> > >> 1, 3 > >> 2, 5 > >> 6, 7 > >> 1, 5 > >> > >> The value cached for the id 1 should be 5. > >> > >> How would you recommend caching the data? And what would be used for the > >> update? A join function? > >> > >> As far as I see things, you cannot really combine DataSets with > >> DataStreams > >> although a DataSet is, in essence, just a finite stream. > >> If this can indeed be done, some pseudocode would be nice :) > >> > >> Thanks! > >> Andra > >> > > > > > |
Free forum by Nabble | Edit this page |