Discard out-of-order events

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Discard out-of-order events

Kevin Jacobs
Is it possible to discard events that are out-of-order (in terms of
event time)?

mxm
Reply | Threaded
Open this post in threaded view
|

Re: Discard out-of-order events

mxm
Hi!

I'm not sure whether I understand your question. The purpose of Event
Time is to be able to process out-of-order events. Do you want to
discard late elements? In the upcoming Flink 1.1.0 you can set the
`allowedLateness` on a windowed stream. The default is 0, so late
elements are discarded; late elements are elements which arrive after
the Watermark has reached the operator.

Cheers,
Max

On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs <[hidden email]> wrote:
> Is it possible to discard events that are out-of-order (in terms of event
> time)?
>
Reply | Threaded
Open this post in threaded view
|

Re: Discard out-of-order events

Kevin Jacobs
Goodmorning :-),

Thank you for your answer. Let me explain my problem more thoroughly
(maybe other options are possible here, not necessary with allowedLateness).

The most compact description of my problem would be Stream Enrichment.
More concrete, suppose I have two streams, where I want to enrich one
stream with another stream:

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b",
"a", "c", "b", "a", "c")

val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is
B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))


The goal here, is to mix information from the infoStream into the
mainStream so the enriched stream would contain the following tuples:

("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("a", "Whoops, it is A")
("a", "Whoops, it is A")
("b", "It is B")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")
("b", "It is B")
("a", "Whoops, it is A")
("c", "It is C")


It is not a requirement that the enriched stream has the same ordering
on the elements as the mainStream. However, it is important that new
elements in the infoStream override older elements from the infoStream.
You can see here that (4, "a", "Whoops, it is A") arrived later than (1,
"a", "It is F") (if you look at the event time, which is the first
element of every tuple). So, the infoStream (at t=4) should contain the
following tuples:

(4, "a", "Whoops, it is A")
(2, "b", "It is B")
(3, "c", "It is C")


So, what I thought of, is iterating over the infoStream can keep
relevant records. Then coGroup it with the mainStream to enrich it. This
approach works fine:

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b",
"a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is
B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
.keyBy(1)
.iterate(iteration => {
     val filtered = iteration.keyBy(1).maxBy(0)
     (iteration, filtered)
})

mainStream
.coGroup(infoStream)
     .where[String]((x: String) => x)
     .equalTo(_._2)
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(50))) {
         (first: Iterator[String], second: Iterator[(Int, String,
String)], out: Collector[(String, String)]) => {
             val records = scala.collection.mutable.MutableList[(Int,
String, String)]()
             for ((record: (Int, String, String)) <- second) {
                 records += record
             }

             for ((key: String) <- first) {
                 var bestDescription = "?"
                 for (record <- records) {
                     if (record._2 == key) {
                         bestDescription = record._3
                     }
                 }
                 out.collect((key, bestDescription))
             }
         }
     }
.print()


My questions for you:
- Can I make this more efficient?
- Is there a way of mixing datasets and datastreams? That would be
really awesome (for at least this use case).
- Is there a way to ensure checkpoints, since I am using an iterative
stream here?
- Can I get rid of the TumblingProcessingTimeWindows? Because in fact,
all of this can be done by Apache Spark. It would be great if Apache
Flink could archieve a higher throughput rate than Apache Spark in this
use case.

I am curious to your answers!

Cheers,
Kevin


On 29.07.2016 10:40, Maximilian Michels wrote:

> Hi!
>
> I'm not sure whether I understand your question. The purpose of Event
> Time is to be able to process out-of-order events. Do you want to
> discard late elements? In the upcoming Flink 1.1.0 you can set the
> `allowedLateness` on a windowed stream. The default is 0, so late
> elements are discarded; late elements are elements which arrive after
> the Watermark has reached the operator.
>
> Cheers,
> Max
>
> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs <[hidden email]> wrote:
>> Is it possible to discard events that are out-of-order (in terms of event
>> time)?
>>

Reply | Threaded
Open this post in threaded view
|

Re: Discard out-of-order events

Gyula Fóra
In reply to this post by mxm
Hi Max,

So if I understand correctly the window operators now, by default,  discard
late elements?
Is this documented somewhere?

Gyula

Maximilian Michels <[hidden email]> ezt írta (időpont: 2016. júl. 29., P,
10:40):

> Hi!
>
> I'm not sure whether I understand your question. The purpose of Event
> Time is to be able to process out-of-order events. Do you want to
> discard late elements? In the upcoming Flink 1.1.0 you can set the
> `allowedLateness` on a windowed stream. The default is 0, so late
> elements are discarded; late elements are elements which arrive after
> the Watermark has reached the operator.
>
> Cheers,
> Max
>
> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs <[hidden email]>
> wrote:
> > Is it possible to discard events that are out-of-order (in terms of event
> > time)?
> >
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: Discard out-of-order events

mxm
@Gyula: This is documented in the JavaDoc of the `allowedLateness(..)`
method and in the docs:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#dealing-with-late-data

@Kevin: Thanks for the explanation, I'll get back to you soon (sort of
in a rush).

Cheers,
Max

On Fri, Jul 29, 2016 at 10:57 AM, Gyula Fóra <[hidden email]> wrote:

> Hi Max,
>
> So if I understand correctly the window operators now, by default,  discard
> late elements?
> Is this documented somewhere?
>
> Gyula
>
> Maximilian Michels <[hidden email]> ezt írta (időpont: 2016. júl. 29., P,
> 10:40):
>
>> Hi!
>>
>> I'm not sure whether I understand your question. The purpose of Event
>> Time is to be able to process out-of-order events. Do you want to
>> discard late elements? In the upcoming Flink 1.1.0 you can set the
>> `allowedLateness` on a windowed stream. The default is 0, so late
>> elements are discarded; late elements are elements which arrive after
>> the Watermark has reached the operator.
>>
>> Cheers,
>> Max
>>
>> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs <[hidden email]>
>> wrote:
>> > Is it possible to discard events that are out-of-order (in terms of event
>> > time)?
>> >
>>