Hi list,
I am surprised by the behaviour of the code below. In particular, I am puzzled by the fact that events do not seem to enter the window in order. What am I doing wrong? Here's what I don't understand. This test outputs the following error: java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": 10, "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)" {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None" {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9, 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> Now, the test is not complete, so it's not surprising that it fails, but what really puzzles me is that there appears to be a moment when my window contains an event at time 9 and an event at time 15, but does not yet include the events at times 10 and 14, which should be part of the same stream (and are indeed added later). This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11 and flink-clients_2.11 (and junit 4.12), running under Java 8 with the relevant parts of the pom.xml uncommented. package enx.cep; import static org.junit.Assert.assertEquals; import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import org.apache.flink.util.Collector;import org.junit.Test; import java.io.*;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketTimeoutException;import java.util.*; import java.util.function.Function;import java.util.stream.Collectors;import java.util.stream.StreamSupport; public class AlgebraTest { @Test public void flinkCanJoinTwoStreams() throws Exception { final List<Msg> inputs = list( Msg(9, "Right", "val", 1), Msg(10, "Left", "attr", 1), Msg(12, "Left", "attr", 2), Msg(14, "Left", "attr", 1), Msg(15, "Right", "val", 1), Msg(17, "Right", "val", 3)); final List<Msg> expected = list( Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, "Right:@t", 9, "Left:@t", 10), Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, "Right:@t", 15, "Left:@t", 14)); final List<Msg> output = runStreamAlg(inputs, source -> { final DataStream<Msg> RightSource = source.filter(msg -> "Right".equals(msg.type)); final DataStream<Msg> LeftSource = source.filter(msg -> "Left".equals(msg.type)); // Join Left & Right streams on // Left.attr == Right.val && abs(Left.t - Right.t) < 5 final DataStream<Msg> joined = LeftSource.union(RightSource) .keyBy(msg -> { if ("Right".equals(msg.type)) { return msg.attrs.get("val"); } else if ("Left".equals(msg.type)) { return msg.attrs.get("attr"); } else { throw new RuntimeException(); } }) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .apply(new WindowFunction<Msg, Msg, Object, GlobalWindow>() { @Override public void apply(Object _key, GlobalWindow _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { List<Integer> times = StreamSupport.stream(ins.spliterator(), false) .map(m -> m.timestamp) .collect(Collectors.toList()); collector.collect(Msg(times.stream().mapToInt(i -> i).min().getAsInt(), "None", "times", times)); } }); return joined; }); assertEquals(expected, output); } private final List<Msg> runStreamAlg(List<Msg> input, Function<DataStream<Msg>, DataStream<Msg>> fn) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final DataStream<Msg> source = env.fromCollection(input) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Msg>() { @Override public long extractAscendingTimestamp(Msg msg) { return msg.timestamp * 1000; } }); final DataStream<Msg> transformed = fn.apply(source); final List<Msg> res = new ArrayList<>(); try (final ServerSocket server = new ServerSocket(0)) { final int serverPort = server.getLocalPort(); transformed.addSink(m -> { try (final Socket client = new Socket("localhost", serverPort)) { final ObjectOutputStream toServer = new ObjectOutputStream(client.getOutputStream()); toServer.writeObject(m); toServer.flush(); toServer.close(); } }); final Thread t = new Thread(() -> { while (true) { try (final ObjectInputStream in = new ObjectInputStream(server.accept().getInputStream())) { res.add((Msg) in.readObject()); server.setSoTimeout(500); } catch (SocketTimeoutException e) { return; } catch (java.io.IOException | ClassNotFoundException e) { throw new RuntimeException(e); } } }); t.start(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } t.join(); } catch (Exception e) { throw new RuntimeException(e); } return res; } private static <T> List<T> list(T elem, T... others) { final List<T> res = new ArrayList<>(); res.add(elem); for(T t: others) { res.add(t); } return res; } private static Msg Msg(int timestamp, String type, Object... attrs) { return new Msg(timestamp, type, attrs); } private static class Msg implements Serializable { private final String type; private final int timestamp; private final Map<String, Object> attrs; public Msg(int timestamp, String type, Object... attrs) { this.timestamp = timestamp; this.type = type; this.attrs = new HashMap<>(); if (attrs.length % 2 != 0) throw new IllegalArgumentException(); for (int i = 0; i < attrs.length; i += 2) { if (!(attrs[i] instanceof String)) throw new IllegalArgumentException(); this.attrs.put((String) attrs[i], attrs[i+1]); } } public String toString() { return String.format("[%d \"%s\" {%s}]", this.timestamp, this.type, this.attrs.entrySet().stream() .sorted((e1, e2) -> e1.getKey().compareTo(e2.getKey())) .map(e -> String.format("\"%s\": %s", e.getKey(), e.getValue())) .reduce((acc, el) -> acc + ", " + el) .orElseGet(() -> "")); } }} |
Hi,
Flink does not make any guarantees about the order of arriving elements except in the case of one-to-one forwarding patterns. That is, only for map/flatMap/filter and such operations will the order in which two successive operations see their elements be the same. Could you please describe in prose form what the expected outcome of your windowing specification is. We could start from this and try to figure out how to make Flink behave as it should. Cheers, Aljoscha On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]> wrote: > Hi list, > > I am surprised by the behaviour of the code below. In particular, I am > puzzled by the fact that events do not seem to enter the window in order. > What am I doing wrong? > > Here's what I don't understand. This test outputs the following error: > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": > 10, > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)" > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None" > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9, > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> > > Now, the test is not complete, so it's not surprising that it fails, but > what really puzzles me is that there appears to be a moment when my window > contains an event at time 9 and an event at time 15, but does not yet > include the events at times 10 and 14, which should be part of the same > stream (and are indeed added later). > > This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11 > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the > relevant parts of the pom.xml uncommented. > > package enx.cep; > import static org.junit.Assert.assertEquals; > import org.apache.flink.streaming.api.TimeCharacteristic;import > org.apache.flink.streaming.api.datastream.DataStream;import > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import > org.apache.flink.util.Collector;import org.junit.Test; > import java.io.*;import java.net.ServerSocket;import > java.net.Socket;import java.net.SocketTimeoutException;import > java.util.*; > import java.util.function.Function;import > java.util.stream.Collectors;import java.util.stream.StreamSupport; > public class AlgebraTest { > @Test public void flinkCanJoinTwoStreams() throws Exception { > final List<Msg> inputs = list( > Msg(9, "Right", "val", 1), > Msg(10, "Left", "attr", 1), > Msg(12, "Left", "attr", 2), > Msg(14, "Left", "attr", 1), > Msg(15, "Right", "val", 1), > Msg(17, "Right", "val", 3)); > final List<Msg> expected = list( > Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, > "Right:@t", 9, "Left:@t", 10), > Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, > "Right:@t", 15, "Left:@t", 14)); > final List<Msg> output = runStreamAlg(inputs, source -> { > final DataStream<Msg> RightSource = source.filter(msg -> > "Right".equals(msg.type)); > final DataStream<Msg> LeftSource = source.filter(msg -> > "Left".equals(msg.type)); > > // Join Left & Right streams on > // Left.attr == Right.val && abs(Left.t - Right.t) < 5 > final DataStream<Msg> joined = LeftSource.union(RightSource) > .keyBy(msg -> { > if ("Right".equals(msg.type)) { > return msg.attrs.get("val"); > } else if ("Left".equals(msg.type)) { > return msg.attrs.get("attr"); > } else { > throw new RuntimeException(); > } > }) > .window(GlobalWindows.create()) > .trigger(CountTrigger.of(1)) > .apply(new WindowFunction<Msg, Msg, Object, > GlobalWindow>() { > @Override > public void apply(Object _key, GlobalWindow > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { > List<Integer> times = > StreamSupport.stream(ins.spliterator(), false) > .map(m -> m.timestamp) > .collect(Collectors.toList()); > > collector.collect(Msg(times.stream().mapToInt(i -> > i).min().getAsInt(), > "None", "times", times)); > } > }); > > return joined; > }); > assertEquals(expected, output); > } > > private final List<Msg> runStreamAlg(List<Msg> input, > Function<DataStream<Msg>, DataStream<Msg>> fn) { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > final DataStream<Msg> source = env.fromCollection(input) > .assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<Msg>() { > @Override > public long extractAscendingTimestamp(Msg msg) { > return msg.timestamp * 1000; > } > }); > final DataStream<Msg> transformed = fn.apply(source); > > final List<Msg> res = new ArrayList<>(); > try (final ServerSocket server = new ServerSocket(0)) { > final int serverPort = server.getLocalPort(); > > transformed.addSink(m -> { > try (final Socket client = new Socket("localhost", > serverPort)) { > final ObjectOutputStream toServer = new > ObjectOutputStream(client.getOutputStream()); > toServer.writeObject(m); > toServer.flush(); > toServer.close(); > } > }); > > final Thread t = new Thread(() -> { > while (true) { > try (final ObjectInputStream in = new > ObjectInputStream(server.accept().getInputStream())) { > res.add((Msg) in.readObject()); > server.setSoTimeout(500); > } catch (SocketTimeoutException e) { > return; > } catch (java.io.IOException | ClassNotFoundException > e) { > throw new RuntimeException(e); > } > } > }); > t.start(); > try { > env.execute(); > } catch (Exception e) { > e.printStackTrace(); > } > t.join(); > } catch (Exception e) { > throw new RuntimeException(e); > } > return res; > } > > private static <T> List<T> list(T elem, T... others) { > final List<T> res = new ArrayList<>(); > res.add(elem); > for(T t: others) { > res.add(t); > } > return res; > } > > private static Msg Msg(int timestamp, String type, Object... attrs) { > return new Msg(timestamp, type, attrs); > } > > private static class Msg implements Serializable { > private final String type; > private final int timestamp; > private final Map<String, Object> attrs; > public Msg(int timestamp, String type, Object... attrs) { > this.timestamp = timestamp; > this.type = type; > this.attrs = new HashMap<>(); > if (attrs.length % 2 != 0) throw new > IllegalArgumentException(); > for (int i = 0; i < attrs.length; i += 2) { > if (!(attrs[i] instanceof String)) throw new > IllegalArgumentException(); > this.attrs.put((String) attrs[i], attrs[i+1]); > } > } > > public String toString() { > return String.format("[%d \"%s\" {%s}]", > this.timestamp, > this.type, > this.attrs.entrySet().stream() > .sorted((e1, e2) -> > e1.getKey().compareTo(e2.getKey())) > .map(e -> String.format("\"%s\": %s", > e.getKey(), e.getValue())) > .reduce((acc, el) -> acc + ", " + el) > .orElseGet(() -> "")); > } > }} > |
Hi Aljoscha,
What I'm looking for is an operator that joins two streams together, but keeps the events in timestamp order. What I was trying to do with the window specification comes down to: for each event on that stream, I want to call this function with this event and all of the events that arrived within 5 minutes after it. So conceptually a sliding window, except that the width is defined in terms of time and the step in terms of count. What I will really need to do is probably manage the window myself in operator state (because in many cases I expect that I will not need the whole window, so it may be interesting to be able to evaluate that eagerly), but I think I really need the events to arrive in order. On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote: > Hi, > Flink does not make any guarantees about the order of arriving elements > except in the case of one-to-one forwarding patterns. That is, only for > map/flatMap/filter and such operations will the order in which two > successive operations see their elements be the same. > > Could you please describe in prose form what the expected outcome of your > windowing specification is. We could start from this and try to figure out > how to make Flink behave as it should. > > Cheers, > Aljoscha > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]> > wrote: > > > Hi list, > > > > I am surprised by the behaviour of the code below. In particular, I am > > puzzled by the fact that events do not seem to enter the window in order. > > What am I doing wrong? > > > > Here's what I don't understand. This test outputs the following error: > > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": > > 10, > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)" > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None" > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9, > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> > > > > Now, the test is not complete, so it's not surprising that it fails, but > > what really puzzles me is that there appears to be a moment when my > window > > contains an event at time 9 and an event at time 15, but does not yet > > include the events at times 10 and 14, which should be part of the same > > stream (and are indeed added later). > > > > This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11 > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the > > relevant parts of the pom.xml uncommented. > > > > package enx.cep; > > import static org.junit.Assert.assertEquals; > > import org.apache.flink.streaming.api.TimeCharacteristic;import > > org.apache.flink.streaming.api.datastream.DataStream;import > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > > > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import > > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import > > org.apache.flink.util.Collector;import org.junit.Test; > > import java.io.*;import java.net.ServerSocket;import > > java.net.Socket;import java.net.SocketTimeoutException;import > > java.util.*; > > import java.util.function.Function;import > > java.util.stream.Collectors;import java.util.stream.StreamSupport; > > public class AlgebraTest { > > @Test public void flinkCanJoinTwoStreams() throws Exception { > > final List<Msg> inputs = list( > > Msg(9, "Right", "val", 1), > > Msg(10, "Left", "attr", 1), > > Msg(12, "Left", "attr", 2), > > Msg(14, "Left", "attr", 1), > > Msg(15, "Right", "val", 1), > > Msg(17, "Right", "val", 3)); > > final List<Msg> expected = list( > > Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", > 1, > > "Right:@t", 9, "Left:@t", 10), > > Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", > 1, > > "Right:@t", 15, "Left:@t", 14)); > > final List<Msg> output = runStreamAlg(inputs, source -> { > > final DataStream<Msg> RightSource = source.filter(msg -> > > "Right".equals(msg.type)); > > final DataStream<Msg> LeftSource = source.filter(msg -> > > "Left".equals(msg.type)); > > > > // Join Left & Right streams on > > // Left.attr == Right.val && abs(Left.t - Right.t) < 5 > > final DataStream<Msg> joined = LeftSource.union(RightSource) > > .keyBy(msg -> { > > if ("Right".equals(msg.type)) { > > return msg.attrs.get("val"); > > } else if ("Left".equals(msg.type)) { > > return msg.attrs.get("attr"); > > } else { > > throw new RuntimeException(); > > } > > }) > > .window(GlobalWindows.create()) > > .trigger(CountTrigger.of(1)) > > .apply(new WindowFunction<Msg, Msg, Object, > > GlobalWindow>() { > > @Override > > public void apply(Object _key, GlobalWindow > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { > > List<Integer> times = > > StreamSupport.stream(ins.spliterator(), false) > > .map(m -> m.timestamp) > > .collect(Collectors.toList()); > > > > collector.collect(Msg(times.stream().mapToInt(i -> > > i).min().getAsInt(), > > "None", "times", times)); > > } > > }); > > > > return joined; > > }); > > assertEquals(expected, output); > > } > > > > private final List<Msg> runStreamAlg(List<Msg> input, > > Function<DataStream<Msg>, DataStream<Msg>> fn) { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > final DataStream<Msg> source = env.fromCollection(input) > > .assignTimestampsAndWatermarks(new > > AscendingTimestampExtractor<Msg>() { > > @Override > > public long extractAscendingTimestamp(Msg msg) { > > return msg.timestamp * 1000; > > } > > }); > > final DataStream<Msg> transformed = fn.apply(source); > > > > final List<Msg> res = new ArrayList<>(); > > try (final ServerSocket server = new ServerSocket(0)) { > > final int serverPort = server.getLocalPort(); > > > > transformed.addSink(m -> { > > try (final Socket client = new Socket("localhost", > > serverPort)) { > > final ObjectOutputStream toServer = new > > ObjectOutputStream(client.getOutputStream()); > > toServer.writeObject(m); > > toServer.flush(); > > toServer.close(); > > } > > }); > > > > final Thread t = new Thread(() -> { > > while (true) { > > try (final ObjectInputStream in = new > > ObjectInputStream(server.accept().getInputStream())) { > > res.add((Msg) in.readObject()); > > server.setSoTimeout(500); > > } catch (SocketTimeoutException e) { > > return; > > } catch (java.io.IOException | ClassNotFoundException > > e) { > > throw new RuntimeException(e); > > } > > } > > }); > > t.start(); > > try { > > env.execute(); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > t.join(); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > return res; > > } > > > > private static <T> List<T> list(T elem, T... others) { > > final List<T> res = new ArrayList<>(); > > res.add(elem); > > for(T t: others) { > > res.add(t); > > } > > return res; > > } > > > > private static Msg Msg(int timestamp, String type, Object... attrs) { > > return new Msg(timestamp, type, attrs); > > } > > > > private static class Msg implements Serializable { > > private final String type; > > private final int timestamp; > > private final Map<String, Object> attrs; > > public Msg(int timestamp, String type, Object... attrs) { > > this.timestamp = timestamp; > > this.type = type; > > this.attrs = new HashMap<>(); > > if (attrs.length % 2 != 0) throw new > > IllegalArgumentException(); > > for (int i = 0; i < attrs.length; i += 2) { > > if (!(attrs[i] instanceof String)) throw new > > IllegalArgumentException(); > > this.attrs.put((String) attrs[i], attrs[i+1]); > > } > > } > > > > public String toString() { > > return String.format("[%d \"%s\" {%s}]", > > this.timestamp, > > this.type, > > this.attrs.entrySet().stream() > > .sorted((e1, e2) -> > > e1.getKey().compareTo(e2.getKey())) > > .map(e -> String.format("\"%s\": %s", > > e.getKey(), e.getValue())) > > .reduce((acc, el) -> acc + ", " + el) > > .orElseGet(() -> "")); > > } > > }} > > > |
Hi,
yes, I'm afraid you need a custom operator for that. (We are working on providing built-in support for this, though) I sketched an Operator that does the sorting and also wrote a quick example that uses it: SortedWindowOperator: https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd SortedWindowExample: https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b It puts incoming elements into buckets based on the timestamp and sorts them once the watermark passes the end of a bucket. Take a look at processWatermark(), there you would fill in your custom logic. Cheers, Aljoscha On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <[hidden email]> wrote: > Hi Aljoscha, > > What I'm looking for is an operator that joins two streams together, but > keeps the events in timestamp order. > > What I was trying to do with the window specification comes down to: for > each event on that stream, I want to call this function with this event and > all of the events that arrived within 5 minutes after it. So conceptually a > sliding window, except that the width is defined in terms of time and the > step in terms of count. > > What I will really need to do is probably manage the window myself in > operator state (because in many cases I expect that I will not need the > whole window, so it may be interesting to be able to evaluate that > eagerly), but I think I really need the events to arrive in order. > > On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote: > > > Hi, > > Flink does not make any guarantees about the order of arriving elements > > except in the case of one-to-one forwarding patterns. That is, only for > > map/flatMap/filter and such operations will the order in which two > > successive operations see their elements be the same. > > > > Could you please describe in prose form what the expected outcome of your > > windowing specification is. We could start from this and try to figure > out > > how to make Flink behave as it should. > > > > Cheers, > > Aljoscha > > > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]> > > wrote: > > > > > Hi list, > > > > > > I am surprised by the behaviour of the code below. In particular, I am > > > puzzled by the fact that events do not seem to enter the window in > order. > > > What am I doing wrong? > > > > > > Here's what I don't understand. This test outputs the following error: > > > > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t > ": > > > 10, > > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 > "Join(Left,Right)" > > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but > > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 > "None" > > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": > [9, > > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> > > > > > > Now, the test is not complete, so it's not surprising that it fails, > but > > > what really puzzles me is that there appears to be a moment when my > > window > > > contains an event at time 9 and an event at time 15, but does not yet > > > include the events at times 10 and 14, which should be part of the same > > > stream (and are indeed added later). > > > > > > This code uses the 1.0.1 version of flink-java, > flink-streaming-java_2.11 > > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the > > > relevant parts of the pom.xml uncommented. > > > > > > package enx.cep; > > > import static org.junit.Assert.assertEquals; > > > import org.apache.flink.streaming.api.TimeCharacteristic;import > > > org.apache.flink.streaming.api.datastream.DataStream;import > > > > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > > > > > > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import > > > > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import > > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import > > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import > > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import > > > org.apache.flink.util.Collector;import org.junit.Test; > > > import java.io.*;import java.net.ServerSocket;import > > > java.net.Socket;import java.net.SocketTimeoutException;import > > > java.util.*; > > > import java.util.function.Function;import > > > java.util.stream.Collectors;import java.util.stream.StreamSupport; > > > public class AlgebraTest { > > > @Test public void flinkCanJoinTwoStreams() throws Exception { > > > final List<Msg> inputs = list( > > > Msg(9, "Right", "val", 1), > > > Msg(10, "Left", "attr", 1), > > > Msg(12, "Left", "attr", 2), > > > Msg(14, "Left", "attr", 1), > > > Msg(15, "Right", "val", 1), > > > Msg(17, "Right", "val", 3)); > > > final List<Msg> expected = list( > > > Msg(10, "Join(Left,Right)", "Right:val", 1, > "Left:attr", > > 1, > > > "Right:@t", 9, "Left:@t", 10), > > > Msg(15, "Join(Left,Right)", "Right:val", 1, > "Left:attr", > > 1, > > > "Right:@t", 15, "Left:@t", 14)); > > > final List<Msg> output = runStreamAlg(inputs, source -> { > > > final DataStream<Msg> RightSource = source.filter(msg -> > > > "Right".equals(msg.type)); > > > final DataStream<Msg> LeftSource = source.filter(msg -> > > > "Left".equals(msg.type)); > > > > > > // Join Left & Right streams on > > > // Left.attr == Right.val && abs(Left.t - Right.t) < 5 > > > final DataStream<Msg> joined = > LeftSource.union(RightSource) > > > .keyBy(msg -> { > > > if ("Right".equals(msg.type)) { > > > return msg.attrs.get("val"); > > > } else if ("Left".equals(msg.type)) { > > > return msg.attrs.get("attr"); > > > } else { > > > throw new RuntimeException(); > > > } > > > }) > > > .window(GlobalWindows.create()) > > > .trigger(CountTrigger.of(1)) > > > .apply(new WindowFunction<Msg, Msg, Object, > > > GlobalWindow>() { > > > @Override > > > public void apply(Object _key, GlobalWindow > > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { > > > List<Integer> times = > > > StreamSupport.stream(ins.spliterator(), false) > > > .map(m -> m.timestamp) > > > .collect(Collectors.toList()); > > > > > > collector.collect(Msg(times.stream().mapToInt(i -> > > > i).min().getAsInt(), > > > "None", "times", times)); > > > } > > > }); > > > > > > return joined; > > > }); > > > assertEquals(expected, output); > > > } > > > > > > private final List<Msg> runStreamAlg(List<Msg> input, > > > Function<DataStream<Msg>, DataStream<Msg>> fn) { > > > final StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > final DataStream<Msg> source = env.fromCollection(input) > > > .assignTimestampsAndWatermarks(new > > > AscendingTimestampExtractor<Msg>() { > > > @Override > > > public long extractAscendingTimestamp(Msg msg) { > > > return msg.timestamp * 1000; > > > } > > > }); > > > final DataStream<Msg> transformed = fn.apply(source); > > > > > > final List<Msg> res = new ArrayList<>(); > > > try (final ServerSocket server = new ServerSocket(0)) { > > > final int serverPort = server.getLocalPort(); > > > > > > transformed.addSink(m -> { > > > try (final Socket client = new Socket("localhost", > > > serverPort)) { > > > final ObjectOutputStream toServer = new > > > ObjectOutputStream(client.getOutputStream()); > > > toServer.writeObject(m); > > > toServer.flush(); > > > toServer.close(); > > > } > > > }); > > > > > > final Thread t = new Thread(() -> { > > > while (true) { > > > try (final ObjectInputStream in = new > > > ObjectInputStream(server.accept().getInputStream())) { > > > res.add((Msg) in.readObject()); > > > server.setSoTimeout(500); > > > } catch (SocketTimeoutException e) { > > > return; > > > } catch (java.io.IOException | > ClassNotFoundException > > > e) { > > > throw new RuntimeException(e); > > > } > > > } > > > }); > > > t.start(); > > > try { > > > env.execute(); > > > } catch (Exception e) { > > > e.printStackTrace(); > > > } > > > t.join(); > > > } catch (Exception e) { > > > throw new RuntimeException(e); > > > } > > > return res; > > > } > > > > > > private static <T> List<T> list(T elem, T... others) { > > > final List<T> res = new ArrayList<>(); > > > res.add(elem); > > > for(T t: others) { > > > res.add(t); > > > } > > > return res; > > > } > > > > > > private static Msg Msg(int timestamp, String type, Object... > attrs) { > > > return new Msg(timestamp, type, attrs); > > > } > > > > > > private static class Msg implements Serializable { > > > private final String type; > > > private final int timestamp; > > > private final Map<String, Object> attrs; > > > public Msg(int timestamp, String type, Object... attrs) { > > > this.timestamp = timestamp; > > > this.type = type; > > > this.attrs = new HashMap<>(); > > > if (attrs.length % 2 != 0) throw new > > > IllegalArgumentException(); > > > for (int i = 0; i < attrs.length; i += 2) { > > > if (!(attrs[i] instanceof String)) throw new > > > IllegalArgumentException(); > > > this.attrs.put((String) attrs[i], attrs[i+1]); > > > } > > > } > > > > > > public String toString() { > > > return String.format("[%d \"%s\" {%s}]", > > > this.timestamp, > > > this.type, > > > this.attrs.entrySet().stream() > > > .sorted((e1, e2) -> > > > e1.getKey().compareTo(e2.getKey())) > > > .map(e -> String.format("\"%s\": %s", > > > e.getKey(), e.getValue())) > > > .reduce((acc, el) -> acc + ", " + el) > > > .orElseGet(() -> "")); > > > } > > > }} > > > > > > |
Not entirely related, but for the special case of writing a parallelized source that emits records in event time order, I found the MergeIterator to be most useful. Here's an example:https://github.com/nupic-community/flink-htm/blob/eb29f97f08f3482b32228db7284f669aad8dce2e/flink-htm-streaming-scala/src/main/scala/org/numenta/nupic/flink/streaming/connectors/river/RiverSource.scala#L157
-Eron Wright > From: [hidden email] > Date: Mon, 18 Apr 2016 08:20:11 +0000 > Subject: Re: Surprising order of events in union of two streams > To: [hidden email] > CC: [hidden email] > > Hi, > yes, I'm afraid you need a custom operator for that. (We are working on > providing built-in support for this, though) > > I sketched an Operator that does the sorting and also wrote a quick example > that uses it: > SortedWindowOperator: > https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd > SortedWindowExample: > https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b > > It puts incoming elements into buckets based on the timestamp and sorts > them once the watermark passes the end of a bucket. Take a look at > processWatermark(), there you would fill in your custom logic. > > Cheers, > Aljoscha > > On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <[hidden email]> > wrote: > > > Hi Aljoscha, > > > > What I'm looking for is an operator that joins two streams together, but > > keeps the events in timestamp order. > > > > What I was trying to do with the window specification comes down to: for > > each event on that stream, I want to call this function with this event and > > all of the events that arrived within 5 minutes after it. So conceptually a > > sliding window, except that the width is defined in terms of time and the > > step in terms of count. > > > > What I will really need to do is probably manage the window myself in > > operator state (because in many cases I expect that I will not need the > > whole window, so it may be interesting to be able to evaluate that > > eagerly), but I think I really need the events to arrive in order. > > > > On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote: > > > > > Hi, > > > Flink does not make any guarantees about the order of arriving elements > > > except in the case of one-to-one forwarding patterns. That is, only for > > > map/flatMap/filter and such operations will the order in which two > > > successive operations see their elements be the same. > > > > > > Could you please describe in prose form what the expected outcome of your > > > windowing specification is. We could start from this and try to figure > > out > > > how to make Flink behave as it should. > > > > > > Cheers, > > > Aljoscha > > > > > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]> > > > wrote: > > > > > > > Hi list, > > > > > > > > I am surprised by the behaviour of the code below. In particular, I am > > > > puzzled by the fact that events do not seem to enter the window in > > order. > > > > What am I doing wrong? > > > > > > > > Here's what I don't understand. This test outputs the following error: > > > > > > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t > > ": > > > > 10, > > > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 > > "Join(Left,Right)" > > > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but > > > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 > > "None" > > > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": > > [9, > > > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> > > > > > > > > Now, the test is not complete, so it's not surprising that it fails, > > but > > > > what really puzzles me is that there appears to be a moment when my > > > window > > > > contains an event at time 9 and an event at time 15, but does not yet > > > > include the events at times 10 and 14, which should be part of the same > > > > stream (and are indeed added later). > > > > > > > > This code uses the 1.0.1 version of flink-java, > > flink-streaming-java_2.11 > > > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the > > > > relevant parts of the pom.xml uncommented. > > > > > > > > package enx.cep; > > > > import static org.junit.Assert.assertEquals; > > > > import org.apache.flink.streaming.api.TimeCharacteristic;import > > > > org.apache.flink.streaming.api.datastream.DataStream;import > > > > > > > > > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > > > > > > > > > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import > > > > > > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import > > > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import > > > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import > > > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import > > > > org.apache.flink.util.Collector;import org.junit.Test; > > > > import java.io.*;import java.net.ServerSocket;import > > > > java.net.Socket;import java.net.SocketTimeoutException;import > > > > java.util.*; > > > > import java.util.function.Function;import > > > > java.util.stream.Collectors;import java.util.stream.StreamSupport; > > > > public class AlgebraTest { > > > > @Test public void flinkCanJoinTwoStreams() throws Exception { > > > > final List<Msg> inputs = list( > > > > Msg(9, "Right", "val", 1), > > > > Msg(10, "Left", "attr", 1), > > > > Msg(12, "Left", "attr", 2), > > > > Msg(14, "Left", "attr", 1), > > > > Msg(15, "Right", "val", 1), > > > > Msg(17, "Right", "val", 3)); > > > > final List<Msg> expected = list( > > > > Msg(10, "Join(Left,Right)", "Right:val", 1, > > "Left:attr", > > > 1, > > > > "Right:@t", 9, "Left:@t", 10), > > > > Msg(15, "Join(Left,Right)", "Right:val", 1, > > "Left:attr", > > > 1, > > > > "Right:@t", 15, "Left:@t", 14)); > > > > final List<Msg> output = runStreamAlg(inputs, source -> { > > > > final DataStream<Msg> RightSource = source.filter(msg -> > > > > "Right".equals(msg.type)); > > > > final DataStream<Msg> LeftSource = source.filter(msg -> > > > > "Left".equals(msg.type)); > > > > > > > > // Join Left & Right streams on > > > > // Left.attr == Right.val && abs(Left.t - Right.t) < 5 > > > > final DataStream<Msg> joined = > > LeftSource.union(RightSource) > > > > .keyBy(msg -> { > > > > if ("Right".equals(msg.type)) { > > > > return msg.attrs.get("val"); > > > > } else if ("Left".equals(msg.type)) { > > > > return msg.attrs.get("attr"); > > > > } else { > > > > throw new RuntimeException(); > > > > } > > > > }) > > > > .window(GlobalWindows.create()) > > > > .trigger(CountTrigger.of(1)) > > > > .apply(new WindowFunction<Msg, Msg, Object, > > > > GlobalWindow>() { > > > > @Override > > > > public void apply(Object _key, GlobalWindow > > > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { > > > > List<Integer> times = > > > > StreamSupport.stream(ins.spliterator(), false) > > > > .map(m -> m.timestamp) > > > > .collect(Collectors.toList()); > > > > > > > > collector.collect(Msg(times.stream().mapToInt(i -> > > > > i).min().getAsInt(), > > > > "None", "times", times)); > > > > } > > > > }); > > > > > > > > return joined; > > > > }); > > > > assertEquals(expected, output); > > > > } > > > > > > > > private final List<Msg> runStreamAlg(List<Msg> input, > > > > Function<DataStream<Msg>, DataStream<Msg>> fn) { > > > > final StreamExecutionEnvironment env = > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > final DataStream<Msg> source = env.fromCollection(input) > > > > .assignTimestampsAndWatermarks(new > > > > AscendingTimestampExtractor<Msg>() { > > > > @Override > > > > public long extractAscendingTimestamp(Msg msg) { > > > > return msg.timestamp * 1000; > > > > } > > > > }); > > > > final DataStream<Msg> transformed = fn.apply(source); > > > > > > > > final List<Msg> res = new ArrayList<>(); > > > > try (final ServerSocket server = new ServerSocket(0)) { > > > > final int serverPort = server.getLocalPort(); > > > > > > > > transformed.addSink(m -> { > > > > try (final Socket client = new Socket("localhost", > > > > serverPort)) { > > > > final ObjectOutputStream toServer = new > > > > ObjectOutputStream(client.getOutputStream()); > > > > toServer.writeObject(m); > > > > toServer.flush(); > > > > toServer.close(); > > > > } > > > > }); > > > > > > > > final Thread t = new Thread(() -> { > > > > while (true) { > > > > try (final ObjectInputStream in = new > > > > ObjectInputStream(server.accept().getInputStream())) { > > > > res.add((Msg) in.readObject()); > > > > server.setSoTimeout(500); > > > > } catch (SocketTimeoutException e) { > > > > return; > > > > } catch (java.io.IOException | > > ClassNotFoundException > > > > e) { > > > > throw new RuntimeException(e); > > > > } > > > > } > > > > }); > > > > t.start(); > > > > try { > > > > env.execute(); > > > > } catch (Exception e) { > > > > e.printStackTrace(); > > > > } > > > > t.join(); > > > > } catch (Exception e) { > > > > throw new RuntimeException(e); > > > > } > > > > return res; > > > > } > > > > > > > > private static <T> List<T> list(T elem, T... others) { > > > > final List<T> res = new ArrayList<>(); > > > > res.add(elem); > > > > for(T t: others) { > > > > res.add(t); > > > > } > > > > return res; > > > > } > > > > > > > > private static Msg Msg(int timestamp, String type, Object... > > attrs) { > > > > return new Msg(timestamp, type, attrs); > > > > } > > > > > > > > private static class Msg implements Serializable { > > > > private final String type; > > > > private final int timestamp; > > > > private final Map<String, Object> attrs; > > > > public Msg(int timestamp, String type, Object... attrs) { > > > > this.timestamp = timestamp; > > > > this.type = type; > > > > this.attrs = new HashMap<>(); > > > > if (attrs.length % 2 != 0) throw new > > > > IllegalArgumentException(); > > > > for (int i = 0; i < attrs.length; i += 2) { > > > > if (!(attrs[i] instanceof String)) throw new > > > > IllegalArgumentException(); > > > > this.attrs.put((String) attrs[i], attrs[i+1]); > > > > } > > > > } > > > > > > > > public String toString() { > > > > return String.format("[%d \"%s\" {%s}]", > > > > this.timestamp, > > > > this.type, > > > > this.attrs.entrySet().stream() > > > > .sorted((e1, e2) -> > > > > e1.getKey().compareTo(e2.getKey())) > > > > .map(e -> String.format("\"%s\": %s", > > > > e.getKey(), e.getValue())) > > > > .reduce((acc, el) -> acc + ", " + el) > > > > .orElseGet(() -> "")); > > > > } > > > > }} > > > > > > > > > |
Free forum by Nabble | Edit this page |