Surprising order of events in union of two streams

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Surprising order of events in union of two streams

Gary Verhaegen
Hi list,

I am surprised by the behaviour of the code below. In particular, I am
puzzled by the fact that events do not seem to enter the window in order.
What am I doing wrong?

Here's what I don't understand. This test outputs the following error:

java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": 10,
"Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
{"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
{"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>

Now, the test is not complete, so it's not surprising that it fails, but
what really puzzles me is that there appears to be a moment when my window
contains an event at time 9 and an event at time 15, but does not yet
include the events at times 10 and 14, which should be part of the same
stream (and are indeed added later).

This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
relevant parts of the pom.xml uncommented.

package enx.cep;
import static org.junit.Assert.assertEquals;
import org.apache.flink.streaming.api.TimeCharacteristic;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
org.apache.flink.util.Collector;import org.junit.Test;
import java.io.*;import java.net.ServerSocket;import
java.net.Socket;import java.net.SocketTimeoutException;import
java.util.*;
import java.util.function.Function;import
java.util.stream.Collectors;import java.util.stream.StreamSupport;
public class AlgebraTest {
    @Test public void flinkCanJoinTwoStreams() throws Exception {
        final List<Msg> inputs = list(
                Msg(9, "Right", "val", 1),
                Msg(10, "Left", "attr", 1),
                Msg(12, "Left", "attr", 2),
                Msg(14, "Left", "attr", 1),
                Msg(15, "Right", "val", 1),
                Msg(17, "Right", "val", 3));
        final List<Msg> expected = list(
                Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
                        "Right:@t", 9, "Left:@t", 10),
                Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
                        "Right:@t", 15, "Left:@t", 14));
        final List<Msg> output = runStreamAlg(inputs, source -> {
            final DataStream<Msg> RightSource = source.filter(msg ->
"Right".equals(msg.type));
            final DataStream<Msg> LeftSource = source.filter(msg ->
"Left".equals(msg.type));

            // Join Left & Right streams on
            // Left.attr == Right.val && abs(Left.t - Right.t) < 5
            final DataStream<Msg> joined = LeftSource.union(RightSource)
                    .keyBy(msg -> {
                        if ("Right".equals(msg.type)) {
                            return msg.attrs.get("val");
                        } else if ("Left".equals(msg.type)) {
                            return msg.attrs.get("attr");
                        } else {
                            throw new RuntimeException();
                        }
                    })
                    .window(GlobalWindows.create())
                    .trigger(CountTrigger.of(1))
                    .apply(new WindowFunction<Msg, Msg, Object,
GlobalWindow>() {
                        @Override
                        public void apply(Object _key, GlobalWindow
_w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
                            List<Integer> times =
StreamSupport.stream(ins.spliterator(), false)
                                    .map(m -> m.timestamp)
                                    .collect(Collectors.toList());

collector.collect(Msg(times.stream().mapToInt(i ->
i).min().getAsInt(),
                                    "None", "times", times));
                        }
                    });

            return joined;
        });
        assertEquals(expected, output);
    }

    private final List<Msg> runStreamAlg(List<Msg> input,
Function<DataStream<Msg>, DataStream<Msg>> fn) {
        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        final DataStream<Msg> source = env.fromCollection(input)
                .assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<Msg>() {
                    @Override
                    public long extractAscendingTimestamp(Msg msg) {
                        return msg.timestamp * 1000;
                    }
                });
        final DataStream<Msg> transformed = fn.apply(source);

        final List<Msg> res = new ArrayList<>();
        try (final ServerSocket server = new ServerSocket(0)) {
            final int serverPort = server.getLocalPort();

            transformed.addSink(m -> {
                try (final Socket client = new Socket("localhost",
serverPort)) {
                    final ObjectOutputStream toServer = new
ObjectOutputStream(client.getOutputStream());
                    toServer.writeObject(m);
                    toServer.flush();
                    toServer.close();
                }
            });

            final Thread t = new Thread(() -> {
                while (true) {
                    try (final ObjectInputStream in = new
ObjectInputStream(server.accept().getInputStream())) {
                        res.add((Msg) in.readObject());
                        server.setSoTimeout(500);
                    } catch (SocketTimeoutException e) {
                        return;
                    } catch (java.io.IOException | ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
            t.start();
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
            t.join();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return res;
    }

    private static <T> List<T> list(T elem, T... others) {
        final List<T> res = new ArrayList<>();
        res.add(elem);
        for(T t: others) {
            res.add(t);
        }
        return res;
    }

    private static Msg Msg(int timestamp, String type, Object... attrs) {
        return new Msg(timestamp, type, attrs);
    }

    private static class Msg implements Serializable {
        private final String type;
        private final int timestamp;
        private final Map<String, Object> attrs;
        public Msg(int timestamp, String type, Object... attrs) {
            this.timestamp = timestamp;
            this.type = type;
            this.attrs = new HashMap<>();
            if (attrs.length % 2 != 0) throw new IllegalArgumentException();
            for (int i = 0; i < attrs.length; i += 2) {
                if (!(attrs[i] instanceof String)) throw new
IllegalArgumentException();
                this.attrs.put((String) attrs[i], attrs[i+1]);
            }
        }

        public String toString() {
            return String.format("[%d \"%s\" {%s}]",
                    this.timestamp,
                    this.type,
                    this.attrs.entrySet().stream()
                            .sorted((e1, e2) ->
e1.getKey().compareTo(e2.getKey()))
                            .map(e -> String.format("\"%s\": %s",
e.getKey(), e.getValue()))
                            .reduce((acc, el) -> acc + ", " + el)
                            .orElseGet(() -> ""));
        }
    }}
Reply | Threaded
Open this post in threaded view
|

Re: Surprising order of events in union of two streams

Aljoscha Krettek-2
Hi,
Flink does not make any guarantees about the order of arriving elements
except in the case of one-to-one forwarding patterns. That is, only for
map/flatMap/filter and such operations will the order in which two
successive operations see their elements be the same.

Could you please describe in prose form what the expected outcome of your
windowing specification is. We could start from this and try to figure out
how to make Flink behave as it should.

Cheers,
Aljoscha

On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]>
wrote:

> Hi list,
>
> I am surprised by the behaviour of the code below. In particular, I am
> puzzled by the fact that events do not seem to enter the window in order.
> What am I doing wrong?
>
> Here's what I don't understand. This test outputs the following error:
>
> java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t":
> 10,
> "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
> {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
> {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
> 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
>
> Now, the test is not complete, so it's not surprising that it fails, but
> what really puzzles me is that there appears to be a moment when my window
> contains an event at time 9 and an event at time 15, but does not yet
> include the events at times 10 and 14, which should be part of the same
> stream (and are indeed added later).
>
> This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
> and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> relevant parts of the pom.xml uncommented.
>
> package enx.cep;
> import static org.junit.Assert.assertEquals;
> import org.apache.flink.streaming.api.TimeCharacteristic;import
> org.apache.flink.streaming.api.datastream.DataStream;import
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> org.apache.flink.util.Collector;import org.junit.Test;
> import java.io.*;import java.net.ServerSocket;import
> java.net.Socket;import java.net.SocketTimeoutException;import
> java.util.*;
> import java.util.function.Function;import
> java.util.stream.Collectors;import java.util.stream.StreamSupport;
> public class AlgebraTest {
>     @Test public void flinkCanJoinTwoStreams() throws Exception {
>         final List<Msg> inputs = list(
>                 Msg(9, "Right", "val", 1),
>                 Msg(10, "Left", "attr", 1),
>                 Msg(12, "Left", "attr", 2),
>                 Msg(14, "Left", "attr", 1),
>                 Msg(15, "Right", "val", 1),
>                 Msg(17, "Right", "val", 3));
>         final List<Msg> expected = list(
>                 Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
>                         "Right:@t", 9, "Left:@t", 10),
>                 Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
>                         "Right:@t", 15, "Left:@t", 14));
>         final List<Msg> output = runStreamAlg(inputs, source -> {
>             final DataStream<Msg> RightSource = source.filter(msg ->
> "Right".equals(msg.type));
>             final DataStream<Msg> LeftSource = source.filter(msg ->
> "Left".equals(msg.type));
>
>             // Join Left & Right streams on
>             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
>             final DataStream<Msg> joined = LeftSource.union(RightSource)
>                     .keyBy(msg -> {
>                         if ("Right".equals(msg.type)) {
>                             return msg.attrs.get("val");
>                         } else if ("Left".equals(msg.type)) {
>                             return msg.attrs.get("attr");
>                         } else {
>                             throw new RuntimeException();
>                         }
>                     })
>                     .window(GlobalWindows.create())
>                     .trigger(CountTrigger.of(1))
>                     .apply(new WindowFunction<Msg, Msg, Object,
> GlobalWindow>() {
>                         @Override
>                         public void apply(Object _key, GlobalWindow
> _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
>                             List<Integer> times =
> StreamSupport.stream(ins.spliterator(), false)
>                                     .map(m -> m.timestamp)
>                                     .collect(Collectors.toList());
>
> collector.collect(Msg(times.stream().mapToInt(i ->
> i).min().getAsInt(),
>                                     "None", "times", times));
>                         }
>                     });
>
>             return joined;
>         });
>         assertEquals(expected, output);
>     }
>
>     private final List<Msg> runStreamAlg(List<Msg> input,
> Function<DataStream<Msg>, DataStream<Msg>> fn) {
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final DataStream<Msg> source = env.fromCollection(input)
>                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Msg>() {
>                     @Override
>                     public long extractAscendingTimestamp(Msg msg) {
>                         return msg.timestamp * 1000;
>                     }
>                 });
>         final DataStream<Msg> transformed = fn.apply(source);
>
>         final List<Msg> res = new ArrayList<>();
>         try (final ServerSocket server = new ServerSocket(0)) {
>             final int serverPort = server.getLocalPort();
>
>             transformed.addSink(m -> {
>                 try (final Socket client = new Socket("localhost",
> serverPort)) {
>                     final ObjectOutputStream toServer = new
> ObjectOutputStream(client.getOutputStream());
>                     toServer.writeObject(m);
>                     toServer.flush();
>                     toServer.close();
>                 }
>             });
>
>             final Thread t = new Thread(() -> {
>                 while (true) {
>                     try (final ObjectInputStream in = new
> ObjectInputStream(server.accept().getInputStream())) {
>                         res.add((Msg) in.readObject());
>                         server.setSoTimeout(500);
>                     } catch (SocketTimeoutException e) {
>                         return;
>                     } catch (java.io.IOException | ClassNotFoundException
> e) {
>                         throw new RuntimeException(e);
>                     }
>                 }
>             });
>             t.start();
>             try {
>                 env.execute();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>             t.join();
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>         return res;
>     }
>
>     private static <T> List<T> list(T elem, T... others) {
>         final List<T> res = new ArrayList<>();
>         res.add(elem);
>         for(T t: others) {
>             res.add(t);
>         }
>         return res;
>     }
>
>     private static Msg Msg(int timestamp, String type, Object... attrs) {
>         return new Msg(timestamp, type, attrs);
>     }
>
>     private static class Msg implements Serializable {
>         private final String type;
>         private final int timestamp;
>         private final Map<String, Object> attrs;
>         public Msg(int timestamp, String type, Object... attrs) {
>             this.timestamp = timestamp;
>             this.type = type;
>             this.attrs = new HashMap<>();
>             if (attrs.length % 2 != 0) throw new
> IllegalArgumentException();
>             for (int i = 0; i < attrs.length; i += 2) {
>                 if (!(attrs[i] instanceof String)) throw new
> IllegalArgumentException();
>                 this.attrs.put((String) attrs[i], attrs[i+1]);
>             }
>         }
>
>         public String toString() {
>             return String.format("[%d \"%s\" {%s}]",
>                     this.timestamp,
>                     this.type,
>                     this.attrs.entrySet().stream()
>                             .sorted((e1, e2) ->
> e1.getKey().compareTo(e2.getKey()))
>                             .map(e -> String.format("\"%s\": %s",
> e.getKey(), e.getValue()))
>                             .reduce((acc, el) -> acc + ", " + el)
>                             .orElseGet(() -> ""));
>         }
>     }}
>
Reply | Threaded
Open this post in threaded view
|

Re: Surprising order of events in union of two streams

Gary Verhaegen
Hi Aljoscha,

What I'm looking for is an operator that joins two streams together, but
keeps the events in timestamp order.

What I was trying to do with the window specification comes down to: for
each event on that stream, I want to call this function with this event and
all of the events that arrived within 5 minutes after it. So conceptually a
sliding window, except that the width is defined in terms of time and the
step in terms of count.

What I will really need to do is probably manage the window myself in
operator state (because in many cases I expect that I will not need the
whole window, so it may be interesting to be able to evaluate that
eagerly), but I think I really need the events to arrive in order.

On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote:

> Hi,
> Flink does not make any guarantees about the order of arriving elements
> except in the case of one-to-one forwarding patterns. That is, only for
> map/flatMap/filter and such operations will the order in which two
> successive operations see their elements be the same.
>
> Could you please describe in prose form what the expected outcome of your
> windowing specification is. We could start from this and try to figure out
> how to make Flink behave as it should.
>
> Cheers,
> Aljoscha
>
> On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]>
> wrote:
>
> > Hi list,
> >
> > I am surprised by the behaviour of the code below. In particular, I am
> > puzzled by the fact that events do not seem to enter the window in order.
> > What am I doing wrong?
> >
> > Here's what I don't understand. This test outputs the following error:
> >
> > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t":
> > 10,
> > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
> > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
> > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
> > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> >
> > Now, the test is not complete, so it's not surprising that it fails, but
> > what really puzzles me is that there appears to be a moment when my
> window
> > contains an event at time 9 and an event at time 15, but does not yet
> > include the events at times 10 and 14, which should be part of the same
> > stream (and are indeed added later).
> >
> > This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
> > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > relevant parts of the pom.xml uncommented.
> >
> > package enx.cep;
> > import static org.junit.Assert.assertEquals;
> > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > org.apache.flink.streaming.api.datastream.DataStream;import
> >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> >
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > org.apache.flink.util.Collector;import org.junit.Test;
> > import java.io.*;import java.net.ServerSocket;import
> > java.net.Socket;import java.net.SocketTimeoutException;import
> > java.util.*;
> > import java.util.function.Function;import
> > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > public class AlgebraTest {
> >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> >         final List<Msg> inputs = list(
> >                 Msg(9, "Right", "val", 1),
> >                 Msg(10, "Left", "attr", 1),
> >                 Msg(12, "Left", "attr", 2),
> >                 Msg(14, "Left", "attr", 1),
> >                 Msg(15, "Right", "val", 1),
> >                 Msg(17, "Right", "val", 3));
> >         final List<Msg> expected = list(
> >                 Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr",
> 1,
> >                         "Right:@t", 9, "Left:@t", 10),
> >                 Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr",
> 1,
> >                         "Right:@t", 15, "Left:@t", 14));
> >         final List<Msg> output = runStreamAlg(inputs, source -> {
> >             final DataStream<Msg> RightSource = source.filter(msg ->
> > "Right".equals(msg.type));
> >             final DataStream<Msg> LeftSource = source.filter(msg ->
> > "Left".equals(msg.type));
> >
> >             // Join Left & Right streams on
> >             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
> >             final DataStream<Msg> joined = LeftSource.union(RightSource)
> >                     .keyBy(msg -> {
> >                         if ("Right".equals(msg.type)) {
> >                             return msg.attrs.get("val");
> >                         } else if ("Left".equals(msg.type)) {
> >                             return msg.attrs.get("attr");
> >                         } else {
> >                             throw new RuntimeException();
> >                         }
> >                     })
> >                     .window(GlobalWindows.create())
> >                     .trigger(CountTrigger.of(1))
> >                     .apply(new WindowFunction<Msg, Msg, Object,
> > GlobalWindow>() {
> >                         @Override
> >                         public void apply(Object _key, GlobalWindow
> > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
> >                             List<Integer> times =
> > StreamSupport.stream(ins.spliterator(), false)
> >                                     .map(m -> m.timestamp)
> >                                     .collect(Collectors.toList());
> >
> > collector.collect(Msg(times.stream().mapToInt(i ->
> > i).min().getAsInt(),
> >                                     "None", "times", times));
> >                         }
> >                     });
> >
> >             return joined;
> >         });
> >         assertEquals(expected, output);
> >     }
> >
> >     private final List<Msg> runStreamAlg(List<Msg> input,
> > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> >         final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >         final DataStream<Msg> source = env.fromCollection(input)
> >                 .assignTimestampsAndWatermarks(new
> > AscendingTimestampExtractor<Msg>() {
> >                     @Override
> >                     public long extractAscendingTimestamp(Msg msg) {
> >                         return msg.timestamp * 1000;
> >                     }
> >                 });
> >         final DataStream<Msg> transformed = fn.apply(source);
> >
> >         final List<Msg> res = new ArrayList<>();
> >         try (final ServerSocket server = new ServerSocket(0)) {
> >             final int serverPort = server.getLocalPort();
> >
> >             transformed.addSink(m -> {
> >                 try (final Socket client = new Socket("localhost",
> > serverPort)) {
> >                     final ObjectOutputStream toServer = new
> > ObjectOutputStream(client.getOutputStream());
> >                     toServer.writeObject(m);
> >                     toServer.flush();
> >                     toServer.close();
> >                 }
> >             });
> >
> >             final Thread t = new Thread(() -> {
> >                 while (true) {
> >                     try (final ObjectInputStream in = new
> > ObjectInputStream(server.accept().getInputStream())) {
> >                         res.add((Msg) in.readObject());
> >                         server.setSoTimeout(500);
> >                     } catch (SocketTimeoutException e) {
> >                         return;
> >                     } catch (java.io.IOException | ClassNotFoundException
> > e) {
> >                         throw new RuntimeException(e);
> >                     }
> >                 }
> >             });
> >             t.start();
> >             try {
> >                 env.execute();
> >             } catch (Exception e) {
> >                 e.printStackTrace();
> >             }
> >             t.join();
> >         } catch (Exception e) {
> >             throw new RuntimeException(e);
> >         }
> >         return res;
> >     }
> >
> >     private static <T> List<T> list(T elem, T... others) {
> >         final List<T> res = new ArrayList<>();
> >         res.add(elem);
> >         for(T t: others) {
> >             res.add(t);
> >         }
> >         return res;
> >     }
> >
> >     private static Msg Msg(int timestamp, String type, Object... attrs) {
> >         return new Msg(timestamp, type, attrs);
> >     }
> >
> >     private static class Msg implements Serializable {
> >         private final String type;
> >         private final int timestamp;
> >         private final Map<String, Object> attrs;
> >         public Msg(int timestamp, String type, Object... attrs) {
> >             this.timestamp = timestamp;
> >             this.type = type;
> >             this.attrs = new HashMap<>();
> >             if (attrs.length % 2 != 0) throw new
> > IllegalArgumentException();
> >             for (int i = 0; i < attrs.length; i += 2) {
> >                 if (!(attrs[i] instanceof String)) throw new
> > IllegalArgumentException();
> >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> >             }
> >         }
> >
> >         public String toString() {
> >             return String.format("[%d \"%s\" {%s}]",
> >                     this.timestamp,
> >                     this.type,
> >                     this.attrs.entrySet().stream()
> >                             .sorted((e1, e2) ->
> > e1.getKey().compareTo(e2.getKey()))
> >                             .map(e -> String.format("\"%s\": %s",
> > e.getKey(), e.getValue()))
> >                             .reduce((acc, el) -> acc + ", " + el)
> >                             .orElseGet(() -> ""));
> >         }
> >     }}
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Surprising order of events in union of two streams

Aljoscha Krettek-2
Hi,
yes, I'm afraid you need a custom operator for that. (We are working on
providing built-in support for this, though)

I sketched an Operator that does the sorting and also wrote a quick example
that uses it:
SortedWindowOperator:
https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd
SortedWindowExample:
https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b

It puts incoming elements into buckets based on the timestamp and sorts
them once the watermark passes the end of a bucket. Take a look at
processWatermark(), there you would fill in your custom logic.

Cheers,
Aljoscha

On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <[hidden email]>
wrote:

> Hi Aljoscha,
>
> What I'm looking for is an operator that joins two streams together, but
> keeps the events in timestamp order.
>
> What I was trying to do with the window specification comes down to: for
> each event on that stream, I want to call this function with this event and
> all of the events that arrived within 5 minutes after it. So conceptually a
> sliding window, except that the width is defined in terms of time and the
> step in terms of count.
>
> What I will really need to do is probably manage the window myself in
> operator state (because in many cases I expect that I will not need the
> whole window, so it may be interesting to be able to evaluate that
> eagerly), but I think I really need the events to arrive in order.
>
> On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote:
>
> > Hi,
> > Flink does not make any guarantees about the order of arriving elements
> > except in the case of one-to-one forwarding patterns. That is, only for
> > map/flatMap/filter and such operations will the order in which two
> > successive operations see their elements be the same.
> >
> > Could you please describe in prose form what the expected outcome of your
> > windowing specification is. We could start from this and try to figure
> out
> > how to make Flink behave as it should.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]>
> > wrote:
> >
> > > Hi list,
> > >
> > > I am surprised by the behaviour of the code below. In particular, I am
> > > puzzled by the fact that events do not seem to enter the window in
> order.
> > > What am I doing wrong?
> > >
> > > Here's what I don't understand. This test outputs the following error:
> > >
> > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t
> ":
> > > 10,
> > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15
> "Join(Left,Right)"
> > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17
> "None"
> > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times":
> [9,
> > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> > >
> > > Now, the test is not complete, so it's not surprising that it fails,
> but
> > > what really puzzles me is that there appears to be a moment when my
> > window
> > > contains an event at time 9 and an event at time 15, but does not yet
> > > include the events at times 10 and 14, which should be part of the same
> > > stream (and are indeed added later).
> > >
> > > This code uses the 1.0.1 version of flink-java,
> flink-streaming-java_2.11
> > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > > relevant parts of the pom.xml uncommented.
> > >
> > > package enx.cep;
> > > import static org.junit.Assert.assertEquals;
> > > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > > org.apache.flink.streaming.api.datastream.DataStream;import
> > >
> > >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> > >
> >
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > >
> org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > > org.apache.flink.util.Collector;import org.junit.Test;
> > > import java.io.*;import java.net.ServerSocket;import
> > > java.net.Socket;import java.net.SocketTimeoutException;import
> > > java.util.*;
> > > import java.util.function.Function;import
> > > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > > public class AlgebraTest {
> > >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> > >         final List<Msg> inputs = list(
> > >                 Msg(9, "Right", "val", 1),
> > >                 Msg(10, "Left", "attr", 1),
> > >                 Msg(12, "Left", "attr", 2),
> > >                 Msg(14, "Left", "attr", 1),
> > >                 Msg(15, "Right", "val", 1),
> > >                 Msg(17, "Right", "val", 3));
> > >         final List<Msg> expected = list(
> > >                 Msg(10, "Join(Left,Right)", "Right:val", 1,
> "Left:attr",
> > 1,
> > >                         "Right:@t", 9, "Left:@t", 10),
> > >                 Msg(15, "Join(Left,Right)", "Right:val", 1,
> "Left:attr",
> > 1,
> > >                         "Right:@t", 15, "Left:@t", 14));
> > >         final List<Msg> output = runStreamAlg(inputs, source -> {
> > >             final DataStream<Msg> RightSource = source.filter(msg ->
> > > "Right".equals(msg.type));
> > >             final DataStream<Msg> LeftSource = source.filter(msg ->
> > > "Left".equals(msg.type));
> > >
> > >             // Join Left & Right streams on
> > >             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
> > >             final DataStream<Msg> joined =
> LeftSource.union(RightSource)
> > >                     .keyBy(msg -> {
> > >                         if ("Right".equals(msg.type)) {
> > >                             return msg.attrs.get("val");
> > >                         } else if ("Left".equals(msg.type)) {
> > >                             return msg.attrs.get("attr");
> > >                         } else {
> > >                             throw new RuntimeException();
> > >                         }
> > >                     })
> > >                     .window(GlobalWindows.create())
> > >                     .trigger(CountTrigger.of(1))
> > >                     .apply(new WindowFunction<Msg, Msg, Object,
> > > GlobalWindow>() {
> > >                         @Override
> > >                         public void apply(Object _key, GlobalWindow
> > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
> > >                             List<Integer> times =
> > > StreamSupport.stream(ins.spliterator(), false)
> > >                                     .map(m -> m.timestamp)
> > >                                     .collect(Collectors.toList());
> > >
> > > collector.collect(Msg(times.stream().mapToInt(i ->
> > > i).min().getAsInt(),
> > >                                     "None", "times", times));
> > >                         }
> > >                     });
> > >
> > >             return joined;
> > >         });
> > >         assertEquals(expected, output);
> > >     }
> > >
> > >     private final List<Msg> runStreamAlg(List<Msg> input,
> > > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> > >         final StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > >         final DataStream<Msg> source = env.fromCollection(input)
> > >                 .assignTimestampsAndWatermarks(new
> > > AscendingTimestampExtractor<Msg>() {
> > >                     @Override
> > >                     public long extractAscendingTimestamp(Msg msg) {
> > >                         return msg.timestamp * 1000;
> > >                     }
> > >                 });
> > >         final DataStream<Msg> transformed = fn.apply(source);
> > >
> > >         final List<Msg> res = new ArrayList<>();
> > >         try (final ServerSocket server = new ServerSocket(0)) {
> > >             final int serverPort = server.getLocalPort();
> > >
> > >             transformed.addSink(m -> {
> > >                 try (final Socket client = new Socket("localhost",
> > > serverPort)) {
> > >                     final ObjectOutputStream toServer = new
> > > ObjectOutputStream(client.getOutputStream());
> > >                     toServer.writeObject(m);
> > >                     toServer.flush();
> > >                     toServer.close();
> > >                 }
> > >             });
> > >
> > >             final Thread t = new Thread(() -> {
> > >                 while (true) {
> > >                     try (final ObjectInputStream in = new
> > > ObjectInputStream(server.accept().getInputStream())) {
> > >                         res.add((Msg) in.readObject());
> > >                         server.setSoTimeout(500);
> > >                     } catch (SocketTimeoutException e) {
> > >                         return;
> > >                     } catch (java.io.IOException |
> ClassNotFoundException
> > > e) {
> > >                         throw new RuntimeException(e);
> > >                     }
> > >                 }
> > >             });
> > >             t.start();
> > >             try {
> > >                 env.execute();
> > >             } catch (Exception e) {
> > >                 e.printStackTrace();
> > >             }
> > >             t.join();
> > >         } catch (Exception e) {
> > >             throw new RuntimeException(e);
> > >         }
> > >         return res;
> > >     }
> > >
> > >     private static <T> List<T> list(T elem, T... others) {
> > >         final List<T> res = new ArrayList<>();
> > >         res.add(elem);
> > >         for(T t: others) {
> > >             res.add(t);
> > >         }
> > >         return res;
> > >     }
> > >
> > >     private static Msg Msg(int timestamp, String type, Object...
> attrs) {
> > >         return new Msg(timestamp, type, attrs);
> > >     }
> > >
> > >     private static class Msg implements Serializable {
> > >         private final String type;
> > >         private final int timestamp;
> > >         private final Map<String, Object> attrs;
> > >         public Msg(int timestamp, String type, Object... attrs) {
> > >             this.timestamp = timestamp;
> > >             this.type = type;
> > >             this.attrs = new HashMap<>();
> > >             if (attrs.length % 2 != 0) throw new
> > > IllegalArgumentException();
> > >             for (int i = 0; i < attrs.length; i += 2) {
> > >                 if (!(attrs[i] instanceof String)) throw new
> > > IllegalArgumentException();
> > >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> > >             }
> > >         }
> > >
> > >         public String toString() {
> > >             return String.format("[%d \"%s\" {%s}]",
> > >                     this.timestamp,
> > >                     this.type,
> > >                     this.attrs.entrySet().stream()
> > >                             .sorted((e1, e2) ->
> > > e1.getKey().compareTo(e2.getKey()))
> > >                             .map(e -> String.format("\"%s\": %s",
> > > e.getKey(), e.getValue()))
> > >                             .reduce((acc, el) -> acc + ", " + el)
> > >                             .orElseGet(() -> ""));
> > >         }
> > >     }}
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

RE: Surprising order of events in union of two streams

Eron Wright
Not entirely related, but for the special case of writing a parallelized source that emits records in event time order, I found the MergeIterator to be most useful.  Here's an example:https://github.com/nupic-community/flink-htm/blob/eb29f97f08f3482b32228db7284f669aad8dce2e/flink-htm-streaming-scala/src/main/scala/org/numenta/nupic/flink/streaming/connectors/river/RiverSource.scala#L157
-Eron Wright

> From: [hidden email]
> Date: Mon, 18 Apr 2016 08:20:11 +0000
> Subject: Re: Surprising order of events in union of two streams
> To: [hidden email]
> CC: [hidden email]
>
> Hi,
> yes, I'm afraid you need a custom operator for that. (We are working on
> providing built-in support for this, though)
>
> I sketched an Operator that does the sorting and also wrote a quick example
> that uses it:
> SortedWindowOperator:
> https://gist.github.com/aljoscha/6600bc1121b7f8a0f68b89988dd341bd
> SortedWindowExample:
> https://gist.github.com/aljoscha/959fc61aff2cd774fe60da711f5ae40b
>
> It puts incoming elements into buckets based on the timestamp and sorts
> them once the watermark passes the end of a bucket. Take a look at
> processWatermark(), there you would fill in your custom logic.
>
> Cheers,
> Aljoscha
>
> On Fri, 15 Apr 2016 at 14:41 Gary Verhaegen <[hidden email]>
> wrote:
>
> > Hi Aljoscha,
> >
> > What I'm looking for is an operator that joins two streams together, but
> > keeps the events in timestamp order.
> >
> > What I was trying to do with the window specification comes down to: for
> > each event on that stream, I want to call this function with this event and
> > all of the events that arrived within 5 minutes after it. So conceptually a
> > sliding window, except that the width is defined in terms of time and the
> > step in terms of count.
> >
> > What I will really need to do is probably manage the window myself in
> > operator state (because in many cases I expect that I will not need the
> > whole window, so it may be interesting to be able to evaluate that
> > eagerly), but I think I really need the events to arrive in order.
> >
> > On 14 April 2016 at 17:04, Aljoscha Krettek <[hidden email]> wrote:
> >
> > > Hi,
> > > Flink does not make any guarantees about the order of arriving elements
> > > except in the case of one-to-one forwarding patterns. That is, only for
> > > map/flatMap/filter and such operations will the order in which two
> > > successive operations see their elements be the same.
> > >
> > > Could you please describe in prose form what the expected outcome of your
> > > windowing specification is. We could start from this and try to figure
> > out
> > > how to make Flink behave as it should.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <[hidden email]>
> > > wrote:
> > >
> > > > Hi list,
> > > >
> > > > I am surprised by the behaviour of the code below. In particular, I am
> > > > puzzled by the fact that events do not seem to enter the window in
> > order.
> > > > What am I doing wrong?
> > > >
> > > > Here's what I don't understand. This test outputs the following error:
> > > >
> > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t
> > ":
> > > > 10,
> > > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15
> > "Join(Left,Right)"
> > > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> > > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17
> > "None"
> > > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times":
> > [9,
> > > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> > > >
> > > > Now, the test is not complete, so it's not surprising that it fails,
> > but
> > > > what really puzzles me is that there appears to be a moment when my
> > > window
> > > > contains an event at time 9 and an event at time 15, but does not yet
> > > > include the events at times 10 and 14, which should be part of the same
> > > > stream (and are indeed added later).
> > > >
> > > > This code uses the 1.0.1 version of flink-java,
> > flink-streaming-java_2.11
> > > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > > > relevant parts of the pom.xml uncommented.
> > > >
> > > > package enx.cep;
> > > > import static org.junit.Assert.assertEquals;
> > > > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > > > org.apache.flink.streaming.api.datastream.DataStream;import
> > > >
> > > >
> > >
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> > > >
> > >
> > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > > >
> > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > > > org.apache.flink.util.Collector;import org.junit.Test;
> > > > import java.io.*;import java.net.ServerSocket;import
> > > > java.net.Socket;import java.net.SocketTimeoutException;import
> > > > java.util.*;
> > > > import java.util.function.Function;import
> > > > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > > > public class AlgebraTest {
> > > >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> > > >         final List<Msg> inputs = list(
> > > >                 Msg(9, "Right", "val", 1),
> > > >                 Msg(10, "Left", "attr", 1),
> > > >                 Msg(12, "Left", "attr", 2),
> > > >                 Msg(14, "Left", "attr", 1),
> > > >                 Msg(15, "Right", "val", 1),
> > > >                 Msg(17, "Right", "val", 3));
> > > >         final List<Msg> expected = list(
> > > >                 Msg(10, "Join(Left,Right)", "Right:val", 1,
> > "Left:attr",
> > > 1,
> > > >                         "Right:@t", 9, "Left:@t", 10),
> > > >                 Msg(15, "Join(Left,Right)", "Right:val", 1,
> > "Left:attr",
> > > 1,
> > > >                         "Right:@t", 15, "Left:@t", 14));
> > > >         final List<Msg> output = runStreamAlg(inputs, source -> {
> > > >             final DataStream<Msg> RightSource = source.filter(msg ->
> > > > "Right".equals(msg.type));
> > > >             final DataStream<Msg> LeftSource = source.filter(msg ->
> > > > "Left".equals(msg.type));
> > > >
> > > >             // Join Left & Right streams on
> > > >             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
> > > >             final DataStream<Msg> joined =
> > LeftSource.union(RightSource)
> > > >                     .keyBy(msg -> {
> > > >                         if ("Right".equals(msg.type)) {
> > > >                             return msg.attrs.get("val");
> > > >                         } else if ("Left".equals(msg.type)) {
> > > >                             return msg.attrs.get("attr");
> > > >                         } else {
> > > >                             throw new RuntimeException();
> > > >                         }
> > > >                     })
> > > >                     .window(GlobalWindows.create())
> > > >                     .trigger(CountTrigger.of(1))
> > > >                     .apply(new WindowFunction<Msg, Msg, Object,
> > > > GlobalWindow>() {
> > > >                         @Override
> > > >                         public void apply(Object _key, GlobalWindow
> > > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
> > > >                             List<Integer> times =
> > > > StreamSupport.stream(ins.spliterator(), false)
> > > >                                     .map(m -> m.timestamp)
> > > >                                     .collect(Collectors.toList());
> > > >
> > > > collector.collect(Msg(times.stream().mapToInt(i ->
> > > > i).min().getAsInt(),
> > > >                                     "None", "times", times));
> > > >                         }
> > > >                     });
> > > >
> > > >             return joined;
> > > >         });
> > > >         assertEquals(expected, output);
> > > >     }
> > > >
> > > >     private final List<Msg> runStreamAlg(List<Msg> input,
> > > > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> > > >         final StreamExecutionEnvironment env =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > > >         final DataStream<Msg> source = env.fromCollection(input)
> > > >                 .assignTimestampsAndWatermarks(new
> > > > AscendingTimestampExtractor<Msg>() {
> > > >                     @Override
> > > >                     public long extractAscendingTimestamp(Msg msg) {
> > > >                         return msg.timestamp * 1000;
> > > >                     }
> > > >                 });
> > > >         final DataStream<Msg> transformed = fn.apply(source);
> > > >
> > > >         final List<Msg> res = new ArrayList<>();
> > > >         try (final ServerSocket server = new ServerSocket(0)) {
> > > >             final int serverPort = server.getLocalPort();
> > > >
> > > >             transformed.addSink(m -> {
> > > >                 try (final Socket client = new Socket("localhost",
> > > > serverPort)) {
> > > >                     final ObjectOutputStream toServer = new
> > > > ObjectOutputStream(client.getOutputStream());
> > > >                     toServer.writeObject(m);
> > > >                     toServer.flush();
> > > >                     toServer.close();
> > > >                 }
> > > >             });
> > > >
> > > >             final Thread t = new Thread(() -> {
> > > >                 while (true) {
> > > >                     try (final ObjectInputStream in = new
> > > > ObjectInputStream(server.accept().getInputStream())) {
> > > >                         res.add((Msg) in.readObject());
> > > >                         server.setSoTimeout(500);
> > > >                     } catch (SocketTimeoutException e) {
> > > >                         return;
> > > >                     } catch (java.io.IOException |
> > ClassNotFoundException
> > > > e) {
> > > >                         throw new RuntimeException(e);
> > > >                     }
> > > >                 }
> > > >             });
> > > >             t.start();
> > > >             try {
> > > >                 env.execute();
> > > >             } catch (Exception e) {
> > > >                 e.printStackTrace();
> > > >             }
> > > >             t.join();
> > > >         } catch (Exception e) {
> > > >             throw new RuntimeException(e);
> > > >         }
> > > >         return res;
> > > >     }
> > > >
> > > >     private static <T> List<T> list(T elem, T... others) {
> > > >         final List<T> res = new ArrayList<>();
> > > >         res.add(elem);
> > > >         for(T t: others) {
> > > >             res.add(t);
> > > >         }
> > > >         return res;
> > > >     }
> > > >
> > > >     private static Msg Msg(int timestamp, String type, Object...
> > attrs) {
> > > >         return new Msg(timestamp, type, attrs);
> > > >     }
> > > >
> > > >     private static class Msg implements Serializable {
> > > >         private final String type;
> > > >         private final int timestamp;
> > > >         private final Map<String, Object> attrs;
> > > >         public Msg(int timestamp, String type, Object... attrs) {
> > > >             this.timestamp = timestamp;
> > > >             this.type = type;
> > > >             this.attrs = new HashMap<>();
> > > >             if (attrs.length % 2 != 0) throw new
> > > > IllegalArgumentException();
> > > >             for (int i = 0; i < attrs.length; i += 2) {
> > > >                 if (!(attrs[i] instanceof String)) throw new
> > > > IllegalArgumentException();
> > > >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> > > >             }
> > > >         }
> > > >
> > > >         public String toString() {
> > > >             return String.format("[%d \"%s\" {%s}]",
> > > >                     this.timestamp,
> > > >                     this.type,
> > > >                     this.attrs.entrySet().stream()
> > > >                             .sorted((e1, e2) ->
> > > > e1.getKey().compareTo(e2.getKey()))
> > > >                             .map(e -> String.format("\"%s\": %s",
> > > > e.getKey(), e.getValue()))
> > > >                             .reduce((acc, el) -> acc + ", " + el)
> > > >                             .orElseGet(() -> ""));
> > > >         }
> > > >     }}
> > > >
> > >
> >