[Discussion] Query regarding Join

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

[Discussion] Query regarding Join

Vinay Patil
Hi,

I have a question regarding the join operation, consider the following
dummy example:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStreamSource<Integer> sourceStream =
env.fromElements(10,20,23,25,30,33,102,18);
DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);

sourceStream.join(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
.apply(new JoinFunction<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
return paramIN1;
}
}).print();

I perfectly get the elements that are matching in both the streams, however
my requirement is to write these matched elements and also the unmatched
elements to sink(S3)

How do I get the unmatched elements from each stream ?

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Matthias J. Sax-2
You need to do an outer-join. However, there is no build-in support for
outer-joins yet.

You can use Window-CoGroup to implement the outer-join as an own operator.


-Matthias

On 06/13/2016 06:53 PM, Vinay Patil wrote:

> Hi,
>
> I have a question regarding the join operation, consider the following
> dummy example:
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> DataStreamSource<Integer> sourceStream =
> env.fromElements(10,20,23,25,30,33,102,18);
> DataStreamSource<Integer> destStream = env.fromElements(20,30,40,50,60,10);
>
> sourceStream.join(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> .apply(new JoinFunction<Integer, Integer, Integer>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception {
> return paramIN1;
> }
> }).print();
>
> I perfectly get the elements that are matching in both the streams, however
> my requirement is to write these matched elements and also the unmatched
> elements to sink(S3)
>
> How do I get the unmatched elements from each stream ?
>
> Regards,
> Vinay Patil
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi Matthias ,

I did not get you, even if we use Co-Group we have to apply it on a key

sourceStream.coGroup(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new CoGroupFunction<Integer, Integer, Integer>() {
private static final long serialVersionUID = 6408179761497497475L;

@Override
public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
paramIterable1,
Collector<Integer> paramCollector) throws Exception {
Iterator<Integer> iterator = paramIterable.iterator();
while(iterator.hasNext()) {
}
}
});

when I debug this ,only the matched element from both stream will come in
the coGroup function.

What I want is how do I check for unmatched elements from both streams and
write it to sink.

Regards,
Vinay Patil

*+91-800-728-4749*

On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]> wrote:

> You need to do an outer-join. However, there is no build-in support for
> outer-joins yet.
>
> You can use Window-CoGroup to implement the outer-join as an own operator.
>
>
> -Matthias
>
> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > Hi,
> >
> > I have a question regarding the join operation, consider the following
> > dummy example:
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > DataStreamSource<Integer> sourceStream =
> > env.fromElements(10,20,23,25,30,33,102,18);
> > DataStreamSource<Integer> destStream =
> env.fromElements(20,30,40,50,60,10);
> >
> > sourceStream.join(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > .apply(new JoinFunction<Integer, Integer, Integer>() {
> >
> > private static final long serialVersionUID = 1L;
> >
> > @Override
> > public Integer join(Integer paramIN1, Integer paramIN2) throws Exception
> {
> > return paramIN1;
> > }
> > }).print();
> >
> > I perfectly get the elements that are matching in both the streams,
> however
> > my requirement is to write these matched elements and also the unmatched
> > elements to sink(S3)
> >
> > How do I get the unmatched elements from each stream ?
> >
> > Regards,
> > Vinay Patil
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

伍翀(云邪)
In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2, Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it means they are matched elements from both stream.
When one of iter1 and iter2 is empty , it means that they are unmatched.


- Jark Wu (wuchong)

> 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
>
> Hi Matthias ,
>
> I did not get you, even if we use Co-Group we have to apply it on a key
>
> sourceStream.coGroup(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> private static final long serialVersionUID = 6408179761497497475L;
>
> @Override
> public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> paramIterable1,
> Collector<Integer> paramCollector) throws Exception {
> Iterator<Integer> iterator = paramIterable.iterator();
> while(iterator.hasNext()) {
> }
> }
> });
>
> when I debug this ,only the matched element from both stream will come in
> the coGroup function.
>
> What I want is how do I check for unmatched elements from both streams and
> write it to sink.
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]> wrote:
>
>> You need to do an outer-join. However, there is no build-in support for
>> outer-joins yet.
>>
>> You can use Window-CoGroup to implement the outer-join as an own operator.
>>
>>
>> -Matthias
>>
>> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>>> Hi,
>>>
>>> I have a question regarding the join operation, consider the following
>>> dummy example:
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> DataStreamSource<Integer> sourceStream =
>>> env.fromElements(10,20,23,25,30,33,102,18);
>>> DataStreamSource<Integer> destStream =
>> env.fromElements(20,30,40,50,60,10);
>>>
>>> sourceStream.join(destStream)
>>> .where(new ElementSelector())
>>> .equalTo(new ElementSelector())
>>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> @Override
>>> public Integer join(Integer paramIN1, Integer paramIN2) throws Exception
>> {
>>> return paramIN1;
>>> }
>>> }).print();
>>>
>>> I perfectly get the elements that are matching in both the streams,
>> however
>>> my requirement is to write these matched elements and also the unmatched
>>> elements to sink(S3)
>>>
>>> How do I get the unmatched elements from each stream ?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
You are right, debugged it for all elements , I can do that now.
Thanks a lot.

Regards,
Vinay Patil

On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <[hidden email]>
wrote:

> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it
> means they are matched elements from both stream.
> When one of iter1 and iter2 is empty , it means that they are unmatched.
>
>
> - Jark Wu (wuchong)
>
> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
> >
> > Hi Matthias ,
> >
> > I did not get you, even if we use Co-Group we have to apply it on a key
> >
> > sourceStream.coGroup(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > private static final long serialVersionUID = 6408179761497497475L;
> >
> > @Override
> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > paramIterable1,
> > Collector<Integer> paramCollector) throws Exception {
> > Iterator<Integer> iterator = paramIterable.iterator();
> > while(iterator.hasNext()) {
> > }
> > }
> > });
> >
> > when I debug this ,only the matched element from both stream will come in
> > the coGroup function.
> >
> > What I want is how do I check for unmatched elements from both streams
> and
> > write it to sink.
> >
> > Regards,
> > Vinay Patil
> >
> > *+91-800-728-4749*
> >
> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]>
> wrote:
> >
> >> You need to do an outer-join. However, there is no build-in support for
> >> outer-joins yet.
> >>
> >> You can use Window-CoGroup to implement the outer-join as an own
> operator.
> >>
> >>
> >> -Matthias
> >>
> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> >>> Hi,
> >>>
> >>> I have a question regarding the join operation, consider the following
> >>> dummy example:
> >>>
> >>> StreamExecutionEnvironment env =
> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >>> DataStreamSource<Integer> sourceStream =
> >>> env.fromElements(10,20,23,25,30,33,102,18);
> >>> DataStreamSource<Integer> destStream =
> >> env.fromElements(20,30,40,50,60,10);
> >>>
> >>> sourceStream.join(destStream)
> >>> .where(new ElementSelector())
> >>> .equalTo(new ElementSelector())
> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> >>>
> >>> private static final long serialVersionUID = 1L;
> >>>
> >>> @Override
> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> Exception
> >> {
> >>> return paramIN1;
> >>> }
> >>> }).print();
> >>>
> >>> I perfectly get the elements that are matching in both the streams,
> >> however
> >>> my requirement is to write these matched elements and also the
> unmatched
> >>> elements to sink(S3)
> >>>
> >>> How do I get the unmatched elements from each stream ?
> >>>
> >>> Regards,
> >>> Vinay Patil
> >>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi Jark,

I am able to get the non-matching elements in a stream :,

Of-course we can collect the matching elements in the same stream as well,
however I want to perform additional operations on the joined stream before
writing it to S3, so I would have to include a separate join operator for
the same two streams, right ?
Correct me if I am wrong.

I have pasted the dummy code which collects the non-matching records (i
have to perform this on the actual data, correct me if I am dong wrong).

sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new CoGroupFunction<Integer, Integer, Integer>() {

private static final long serialVersionUID = 6408179761497497475L;

@Override
public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
paramIterable1,
Collector<Integer> paramCollector) throws Exception {
long exactSizeIfKnown = paramIterable.spliterator().getExactSizeIfKnown();
long exactSizeIfKnown2 = paramIterable1.spliterator().getExactSizeIfKnown();
if(exactSizeIfKnown == 0 ) {
paramCollector.collect(paramIterable1.iterator().next());
} else if (exactSizeIfKnown2 == 0) {
paramCollector.collect(paramIterable.iterator().next());
}
}
}).print();

Regards,
Vinay Patil


On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <[hidden email]>
wrote:

> You are right, debugged it for all elements , I can do that now.
> Thanks a lot.
>
> Regards,
> Vinay Patil
>
> On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <[hidden email]>
> wrote:
>
>> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
>> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it
>> means they are matched elements from both stream.
>> When one of iter1 and iter2 is empty , it means that they are unmatched.
>>
>>
>> - Jark Wu (wuchong)
>>
>> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
>> >
>> > Hi Matthias ,
>> >
>> > I did not get you, even if we use Co-Group we have to apply it on a key
>> >
>> > sourceStream.coGroup(destStream)
>> > .where(new ElementSelector())
>> > .equalTo(new ElementSelector())
>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>> > private static final long serialVersionUID = 6408179761497497475L;
>> >
>> > @Override
>> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
>> > paramIterable1,
>> > Collector<Integer> paramCollector) throws Exception {
>> > Iterator<Integer> iterator = paramIterable.iterator();
>> > while(iterator.hasNext()) {
>> > }
>> > }
>> > });
>> >
>> > when I debug this ,only the matched element from both stream will come
>> in
>> > the coGroup function.
>> >
>> > What I want is how do I check for unmatched elements from both streams
>> and
>> > write it to sink.
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > *+91-800-728-4749*
>> >
>> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]>
>> wrote:
>> >
>> >> You need to do an outer-join. However, there is no build-in support for
>> >> outer-joins yet.
>> >>
>> >> You can use Window-CoGroup to implement the outer-join as an own
>> operator.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>> >>> Hi,
>> >>>
>> >>> I have a question regarding the join operation, consider the following
>> >>> dummy example:
>> >>>
>> >>> StreamExecutionEnvironment env =
>> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> >>> DataStreamSource<Integer> sourceStream =
>> >>> env.fromElements(10,20,23,25,30,33,102,18);
>> >>> DataStreamSource<Integer> destStream =
>> >> env.fromElements(20,30,40,50,60,10);
>> >>>
>> >>> sourceStream.join(destStream)
>> >>> .where(new ElementSelector())
>> >>> .equalTo(new ElementSelector())
>> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>> >>>
>> >>> private static final long serialVersionUID = 1L;
>> >>>
>> >>> @Override
>> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
>> Exception
>> >> {
>> >>> return paramIN1;
>> >>> }
>> >>> }).print();
>> >>>
>> >>> I perfectly get the elements that are matching in both the streams,
>> >> however
>> >>> my requirement is to write these matched elements and also the
>> unmatched
>> >>> elements to sink(S3)
>> >>>
>> >>> How do I get the unmatched elements from each stream ?
>> >>>
>> >>> Regards,
>> >>> Vinay Patil
>> >>>
>> >>
>> >>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Fabian Hueske-2
Can you add a flag to each element emitted by the CoGroupFunction that
indicates whether it was joined or not?
Then you can use split to distinguish between both cases and handle both
streams differently.

Best, Fabian

2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:

> Hi Jark,
>
> I am able to get the non-matching elements in a stream :,
>
> Of-course we can collect the matching elements in the same stream as well,
> however I want to perform additional operations on the joined stream before
> writing it to S3, so I would have to include a separate join operator for
> the same two streams, right ?
> Correct me if I am wrong.
>
> I have pasted the dummy code which collects the non-matching records (i
> have to perform this on the actual data, correct me if I am dong wrong).
>
> sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
> ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>
> private static final long serialVersionUID = 6408179761497497475L;
>
> @Override
> public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> paramIterable1,
> Collector<Integer> paramCollector) throws Exception {
> long exactSizeIfKnown = paramIterable.spliterator().getExactSizeIfKnown();
> long exactSizeIfKnown2 =
> paramIterable1.spliterator().getExactSizeIfKnown();
> if(exactSizeIfKnown == 0 ) {
> paramCollector.collect(paramIterable1.iterator().next());
> } else if (exactSizeIfKnown2 == 0) {
> paramCollector.collect(paramIterable.iterator().next());
> }
> }
> }).print();
>
> Regards,
> Vinay Patil
>
>
> On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <[hidden email]>
> wrote:
>
> > You are right, debugged it for all elements , I can do that now.
> > Thanks a lot.
> >
> > Regards,
> > Vinay Patil
> >
> > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <[hidden email]>
> > wrote:
> >
> >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> >> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty, it
> >> means they are matched elements from both stream.
> >> When one of iter1 and iter2 is empty , it means that they are unmatched.
> >>
> >>
> >> - Jark Wu (wuchong)
> >>
> >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
> >> >
> >> > Hi Matthias ,
> >> >
> >> > I did not get you, even if we use Co-Group we have to apply it on a
> key
> >> >
> >> > sourceStream.coGroup(destStream)
> >> > .where(new ElementSelector())
> >> > .equalTo(new ElementSelector())
> >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> >> > private static final long serialVersionUID = 6408179761497497475L;
> >> >
> >> > @Override
> >> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> >> > paramIterable1,
> >> > Collector<Integer> paramCollector) throws Exception {
> >> > Iterator<Integer> iterator = paramIterable.iterator();
> >> > while(iterator.hasNext()) {
> >> > }
> >> > }
> >> > });
> >> >
> >> > when I debug this ,only the matched element from both stream will come
> >> in
> >> > the coGroup function.
> >> >
> >> > What I want is how do I check for unmatched elements from both streams
> >> and
> >> > write it to sink.
> >> >
> >> > Regards,
> >> > Vinay Patil
> >> >
> >> > *+91-800-728-4749*
> >> >
> >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]>
> >> wrote:
> >> >
> >> >> You need to do an outer-join. However, there is no build-in support
> for
> >> >> outer-joins yet.
> >> >>
> >> >> You can use Window-CoGroup to implement the outer-join as an own
> >> operator.
> >> >>
> >> >>
> >> >> -Matthias
> >> >>
> >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> >> >>> Hi,
> >> >>>
> >> >>> I have a question regarding the join operation, consider the
> following
> >> >>> dummy example:
> >> >>>
> >> >>> StreamExecutionEnvironment env =
> >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >> >>> DataStreamSource<Integer> sourceStream =
> >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> >> >>> DataStreamSource<Integer> destStream =
> >> >> env.fromElements(20,30,40,50,60,10);
> >> >>>
> >> >>> sourceStream.join(destStream)
> >> >>> .where(new ElementSelector())
> >> >>> .equalTo(new ElementSelector())
> >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> >> >>>
> >> >>> private static final long serialVersionUID = 1L;
> >> >>>
> >> >>> @Override
> >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> >> Exception
> >> >> {
> >> >>> return paramIN1;
> >> >>> }
> >> >>> }).print();
> >> >>>
> >> >>> I perfectly get the elements that are matching in both the streams,
> >> >> however
> >> >>> my requirement is to write these matched elements and also the
> >> unmatched
> >> >>> elements to sink(S3)
> >> >>>
> >> >>> How do I get the unmatched elements from each stream ?
> >> >>>
> >> >>> Regards,
> >> >>> Vinay Patil
> >> >>>
> >> >>
> >> >>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi ,

I am able to get the matching and non-matching elements.

However when I am unit testing the code , I am getting one record less
inside the overriden cogroup function.
Testing the following way :

1) Insert 5 messages into local kafka topic (test1)
2) Insert different 5 messages into local kafka topic (test2)
3) Consume 1) and 2) and I have two different kafka  streams
4) Generate ascending timestamp(using Event Time) for both streams and
create key(String)

Now till 4) I am able to get all the records (checked by printing the
stream in text file)

However when I send the stream to co-group operator, I am receiving one
less record, using the following syntax:

sourceStream.coGroup(destStream)
.where(new ElementSelector())
.equalTo(new ElementSelector())
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinStreams);

Also in the Element Selector I have inserted a sysout, I am getting 20
sysouts instead of 10 (10 sysouts for source and 10 for dest stream)

Unable to understand why one record is coming less to co-group



Regards,
Vinay Patil

On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]> wrote:

> Can you add a flag to each element emitted by the CoGroupFunction that
> indicates whether it was joined or not?
> Then you can use split to distinguish between both cases and handle both
> streams differently.
>
> Best, Fabian
>
> 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
>
> > Hi Jark,
> >
> > I am able to get the non-matching elements in a stream :,
> >
> > Of-course we can collect the matching elements in the same stream as
> well,
> > however I want to perform additional operations on the joined stream
> before
> > writing it to S3, so I would have to include a separate join operator for
> > the same two streams, right ?
> > Correct me if I am wrong.
> >
> > I have pasted the dummy code which collects the non-matching records (i
> > have to perform this on the actual data, correct me if I am dong wrong).
> >
> > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
> > ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> >
> > private static final long serialVersionUID = 6408179761497497475L;
> >
> > @Override
> > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > paramIterable1,
> > Collector<Integer> paramCollector) throws Exception {
> > long exactSizeIfKnown =
> paramIterable.spliterator().getExactSizeIfKnown();
> > long exactSizeIfKnown2 =
> > paramIterable1.spliterator().getExactSizeIfKnown();
> > if(exactSizeIfKnown == 0 ) {
> > paramCollector.collect(paramIterable1.iterator().next());
> > } else if (exactSizeIfKnown2 == 0) {
> > paramCollector.collect(paramIterable.iterator().next());
> > }
> > }
> > }).print();
> >
> > Regards,
> > Vinay Patil
> >
> >
> > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <[hidden email]>
> > wrote:
> >
> > > You are right, debugged it for all elements , I can do that now.
> > > Thanks a lot.
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <[hidden email]>
> > > wrote:
> > >
> > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not empty,
> it
> > >> means they are matched elements from both stream.
> > >> When one of iter1 and iter2 is empty , it means that they are
> unmatched.
> > >>
> > >>
> > >> - Jark Wu (wuchong)
> > >>
> > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
> > >> >
> > >> > Hi Matthias ,
> > >> >
> > >> > I did not get you, even if we use Co-Group we have to apply it on a
> > key
> > >> >
> > >> > sourceStream.coGroup(destStream)
> > >> > .where(new ElementSelector())
> > >> > .equalTo(new ElementSelector())
> > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >> > private static final long serialVersionUID = 6408179761497497475L;
> > >> >
> > >> > @Override
> > >> > public void coGroup(Iterable<Integer> paramIterable,
> Iterable<Integer>
> > >> > paramIterable1,
> > >> > Collector<Integer> paramCollector) throws Exception {
> > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > >> > while(iterator.hasNext()) {
> > >> > }
> > >> > }
> > >> > });
> > >> >
> > >> > when I debug this ,only the matched element from both stream will
> come
> > >> in
> > >> > the coGroup function.
> > >> >
> > >> > What I want is how do I check for unmatched elements from both
> streams
> > >> and
> > >> > write it to sink.
> > >> >
> > >> > Regards,
> > >> > Vinay Patil
> > >> >
> > >> > *+91-800-728-4749*
> > >> >
> > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <[hidden email]>
> > >> wrote:
> > >> >
> > >> >> You need to do an outer-join. However, there is no build-in support
> > for
> > >> >> outer-joins yet.
> > >> >>
> > >> >> You can use Window-CoGroup to implement the outer-join as an own
> > >> operator.
> > >> >>
> > >> >>
> > >> >> -Matthias
> > >> >>
> > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > >> >>> Hi,
> > >> >>>
> > >> >>> I have a question regarding the join operation, consider the
> > following
> > >> >>> dummy example:
> > >> >>>
> > >> >>> StreamExecutionEnvironment env =
> > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > >> >>> DataStreamSource<Integer> sourceStream =
> > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > >> >>> DataStreamSource<Integer> destStream =
> > >> >> env.fromElements(20,30,40,50,60,10);
> > >> >>>
> > >> >>> sourceStream.join(destStream)
> > >> >>> .where(new ElementSelector())
> > >> >>> .equalTo(new ElementSelector())
> > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > >> >>>
> > >> >>> private static final long serialVersionUID = 1L;
> > >> >>>
> > >> >>> @Override
> > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> > >> Exception
> > >> >> {
> > >> >>> return paramIN1;
> > >> >>> }
> > >> >>> }).print();
> > >> >>>
> > >> >>> I perfectly get the elements that are matching in both the
> streams,
> > >> >> however
> > >> >>> my requirement is to write these matched elements and also the
> > >> unmatched
> > >> >>> elements to sink(S3)
> > >> >>>
> > >> >>> How do I get the unmatched elements from each stream ?
> > >> >>>
> > >> >>> Regards,
> > >> >>> Vinay Patil
> > >> >>>
> > >> >>
> > >> >>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Aljoscha Krettek-2
Hi,
what timestamps are you assigning? Is it guaranteed that all of them would
fall into the same 30 second window?

The issue with duplicate printing in the ElementSelector is strange? Could
you post a more complete code example so that I can reproduce the problem?

Cheers,
Aljoscha

On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]> wrote:

> Hi ,
>
> I am able to get the matching and non-matching elements.
>
> However when I am unit testing the code , I am getting one record less
> inside the overriden cogroup function.
> Testing the following way :
>
> 1) Insert 5 messages into local kafka topic (test1)
> 2) Insert different 5 messages into local kafka topic (test2)
> 3) Consume 1) and 2) and I have two different kafka  streams
> 4) Generate ascending timestamp(using Event Time) for both streams and
> create key(String)
>
> Now till 4) I am able to get all the records (checked by printing the
> stream in text file)
>
> However when I send the stream to co-group operator, I am receiving one
> less record, using the following syntax:
>
> sourceStream.coGroup(destStream)
> .where(new ElementSelector())
> .equalTo(new ElementSelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> .apply(new JoinStreams);
>
> Also in the Element Selector I have inserted a sysout, I am getting 20
> sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
>
> Unable to understand why one record is coming less to co-group
>
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]> wrote:
>
> > Can you add a flag to each element emitted by the CoGroupFunction that
> > indicates whether it was joined or not?
> > Then you can use split to distinguish between both cases and handle both
> > streams differently.
> >
> > Best, Fabian
> >
> > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
> >
> > > Hi Jark,
> > >
> > > I am able to get the non-matching elements in a stream :,
> > >
> > > Of-course we can collect the matching elements in the same stream as
> > well,
> > > however I want to perform additional operations on the joined stream
> > before
> > > writing it to S3, so I would have to include a separate join operator
> for
> > > the same two streams, right ?
> > > Correct me if I am wrong.
> > >
> > > I have pasted the dummy code which collects the non-matching records (i
> > > have to perform this on the actual data, correct me if I am dong
> wrong).
> > >
> > > sourceStream.coGroup(destStream).where(new
> ElementSelector()).equalTo(new
> > > ElementSelector())
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >
> > > private static final long serialVersionUID = 6408179761497497475L;
> > >
> > > @Override
> > > public void coGroup(Iterable<Integer> paramIterable, Iterable<Integer>
> > > paramIterable1,
> > > Collector<Integer> paramCollector) throws Exception {
> > > long exactSizeIfKnown =
> > paramIterable.spliterator().getExactSizeIfKnown();
> > > long exactSizeIfKnown2 =
> > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > if(exactSizeIfKnown == 0 ) {
> > > paramCollector.collect(paramIterable1.iterator().next());
> > > } else if (exactSizeIfKnown2 == 0) {
> > > paramCollector.collect(paramIterable.iterator().next());
> > > }
> > > }
> > > }).print();
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > >
> > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <[hidden email]>
> > > wrote:
> > >
> > > > You are right, debugged it for all elements , I can do that now.
> > > > Thanks a lot.
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> [hidden email]>
> > > > wrote:
> > > >
> > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
> empty,
> > it
> > > >> means they are matched elements from both stream.
> > > >> When one of iter1 and iter2 is empty , it means that they are
> > unmatched.
> > > >>
> > > >>
> > > >> - Jark Wu (wuchong)
> > > >>
> > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
> > > >> >
> > > >> > Hi Matthias ,
> > > >> >
> > > >> > I did not get you, even if we use Co-Group we have to apply it on
> a
> > > key
> > > >> >
> > > >> > sourceStream.coGroup(destStream)
> > > >> > .where(new ElementSelector())
> > > >> > .equalTo(new ElementSelector())
> > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > >> > private static final long serialVersionUID = 6408179761497497475L;
> > > >> >
> > > >> > @Override
> > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > Iterable<Integer>
> > > >> > paramIterable1,
> > > >> > Collector<Integer> paramCollector) throws Exception {
> > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > > >> > while(iterator.hasNext()) {
> > > >> > }
> > > >> > }
> > > >> > });
> > > >> >
> > > >> > when I debug this ,only the matched element from both stream will
> > come
> > > >> in
> > > >> > the coGroup function.
> > > >> >
> > > >> > What I want is how do I check for unmatched elements from both
> > streams
> > > >> and
> > > >> > write it to sink.
> > > >> >
> > > >> > Regards,
> > > >> > Vinay Patil
> > > >> >
> > > >> > *+91-800-728-4749*
> > > >> >
> > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> [hidden email]>
> > > >> wrote:
> > > >> >
> > > >> >> You need to do an outer-join. However, there is no build-in
> support
> > > for
> > > >> >> outer-joins yet.
> > > >> >>
> > > >> >> You can use Window-CoGroup to implement the outer-join as an own
> > > >> operator.
> > > >> >>
> > > >> >>
> > > >> >> -Matthias
> > > >> >>
> > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > >> >>> Hi,
> > > >> >>>
> > > >> >>> I have a question regarding the join operation, consider the
> > > following
> > > >> >>> dummy example:
> > > >> >>>
> > > >> >>> StreamExecutionEnvironment env =
> > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > > >> >>>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > >> >>> DataStreamSource<Integer> sourceStream =
> > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > >> >>> DataStreamSource<Integer> destStream =
> > > >> >> env.fromElements(20,30,40,50,60,10);
> > > >> >>>
> > > >> >>> sourceStream.join(destStream)
> > > >> >>> .where(new ElementSelector())
> > > >> >>> .equalTo(new ElementSelector())
> > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > > >> >>>
> > > >> >>> private static final long serialVersionUID = 1L;
> > > >> >>>
> > > >> >>> @Override
> > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> > > >> Exception
> > > >> >> {
> > > >> >>> return paramIN1;
> > > >> >>> }
> > > >> >>> }).print();
> > > >> >>>
> > > >> >>> I perfectly get the elements that are matching in both the
> > streams,
> > > >> >> however
> > > >> >>> my requirement is to write these matched elements and also the
> > > >> unmatched
> > > >> >>> elements to sink(S3)
> > > >> >>>
> > > >> >>> How do I get the unmatched elements from each stream ?
> > > >> >>>
> > > >> >>> Regards,
> > > >> >>> Vinay Patil
> > > >> >>>
> > > >> >>
> > > >> >>
> > > >>
> > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi ,

Actually I am only publishing 5 messages each to two different kafka topics
(using Junit), even if I keep the window to 500 seconds the result is same.

I am not understanding why it is not sending the 5th element to co-group
operator even when the keys are same.

I actually cannot share the actual client code.
But this is what the streams look like :
sourceStream.coGroup(destStream)
here the sourceStream and destStream is actually Tuple2<String,DTO> , and
the ElementSelector returns tuple.f0 which is the key.

I am generating the timestamp based on a field from the DTO which is
guaranteed to be in order.

Will using the triggers help here ?


Regards,
Vinay Patil

*+91-800-728-4749*

On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> what timestamps are you assigning? Is it guaranteed that all of them would
> fall into the same 30 second window?
>
> The issue with duplicate printing in the ElementSelector is strange? Could
> you post a more complete code example so that I can reproduce the problem?
>
> Cheers,
> Aljoscha
>
> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]> wrote:
>
> > Hi ,
> >
> > I am able to get the matching and non-matching elements.
> >
> > However when I am unit testing the code , I am getting one record less
> > inside the overriden cogroup function.
> > Testing the following way :
> >
> > 1) Insert 5 messages into local kafka topic (test1)
> > 2) Insert different 5 messages into local kafka topic (test2)
> > 3) Consume 1) and 2) and I have two different kafka  streams
> > 4) Generate ascending timestamp(using Event Time) for both streams and
> > create key(String)
> >
> > Now till 4) I am able to get all the records (checked by printing the
> > stream in text file)
> >
> > However when I send the stream to co-group operator, I am receiving one
> > less record, using the following syntax:
> >
> > sourceStream.coGroup(destStream)
> > .where(new ElementSelector())
> > .equalTo(new ElementSelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > .apply(new JoinStreams);
> >
> > Also in the Element Selector I have inserted a sysout, I am getting 20
> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
> >
> > Unable to understand why one record is coming less to co-group
> >
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Can you add a flag to each element emitted by the CoGroupFunction that
> > > indicates whether it was joined or not?
> > > Then you can use split to distinguish between both cases and handle
> both
> > > streams differently.
> > >
> > > Best, Fabian
> > >
> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
> > >
> > > > Hi Jark,
> > > >
> > > > I am able to get the non-matching elements in a stream :,
> > > >
> > > > Of-course we can collect the matching elements in the same stream as
> > > well,
> > > > however I want to perform additional operations on the joined stream
> > > before
> > > > writing it to S3, so I would have to include a separate join operator
> > for
> > > > the same two streams, right ?
> > > > Correct me if I am wrong.
> > > >
> > > > I have pasted the dummy code which collects the non-matching records
> (i
> > > > have to perform this on the actual data, correct me if I am dong
> > wrong).
> > > >
> > > > sourceStream.coGroup(destStream).where(new
> > ElementSelector()).equalTo(new
> > > > ElementSelector())
> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > >
> > > > private static final long serialVersionUID = 6408179761497497475L;
> > > >
> > > > @Override
> > > > public void coGroup(Iterable<Integer> paramIterable,
> Iterable<Integer>
> > > > paramIterable1,
> > > > Collector<Integer> paramCollector) throws Exception {
> > > > long exactSizeIfKnown =
> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > long exactSizeIfKnown2 =
> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > if(exactSizeIfKnown == 0 ) {
> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > paramCollector.collect(paramIterable.iterator().next());
> > > > }
> > > > }
> > > > }).print();
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > >
> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> [hidden email]>
> > > > wrote:
> > > >
> > > > > You are right, debugged it for all elements , I can do that now.
> > > > > Thanks a lot.
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
> > empty,
> > > it
> > > > >> means they are matched elements from both stream.
> > > > >> When one of iter1 and iter2 is empty , it means that they are
> > > unmatched.
> > > > >>
> > > > >>
> > > > >> - Jark Wu (wuchong)
> > > > >>
> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
> > > > >> >
> > > > >> > Hi Matthias ,
> > > > >> >
> > > > >> > I did not get you, even if we use Co-Group we have to apply it
> on
> > a
> > > > key
> > > > >> >
> > > > >> > sourceStream.coGroup(destStream)
> > > > >> > .where(new ElementSelector())
> > > > >> > .equalTo(new ElementSelector())
> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > > >> > private static final long serialVersionUID =
> 6408179761497497475L;
> > > > >> >
> > > > >> > @Override
> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > > Iterable<Integer>
> > > > >> > paramIterable1,
> > > > >> > Collector<Integer> paramCollector) throws Exception {
> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > > > >> > while(iterator.hasNext()) {
> > > > >> > }
> > > > >> > }
> > > > >> > });
> > > > >> >
> > > > >> > when I debug this ,only the matched element from both stream
> will
> > > come
> > > > >> in
> > > > >> > the coGroup function.
> > > > >> >
> > > > >> > What I want is how do I check for unmatched elements from both
> > > streams
> > > > >> and
> > > > >> > write it to sink.
> > > > >> >
> > > > >> > Regards,
> > > > >> > Vinay Patil
> > > > >> >
> > > > >> > *+91-800-728-4749*
> > > > >> >
> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > [hidden email]>
> > > > >> wrote:
> > > > >> >
> > > > >> >> You need to do an outer-join. However, there is no build-in
> > support
> > > > for
> > > > >> >> outer-joins yet.
> > > > >> >>
> > > > >> >> You can use Window-CoGroup to implement the outer-join as an
> own
> > > > >> operator.
> > > > >> >>
> > > > >> >>
> > > > >> >> -Matthias
> > > > >> >>
> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > >> >>> Hi,
> > > > >> >>>
> > > > >> >>> I have a question regarding the join operation, consider the
> > > > following
> > > > >> >>> dummy example:
> > > > >> >>>
> > > > >> >>> StreamExecutionEnvironment env =
> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >> >>>
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > >> >>>
> > > > >> >>> sourceStream.join(destStream)
> > > > >> >>> .where(new ElementSelector())
> > > > >> >>> .equalTo(new ElementSelector())
> > > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > > > >> >>>
> > > > >> >>> private static final long serialVersionUID = 1L;
> > > > >> >>>
> > > > >> >>> @Override
> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2) throws
> > > > >> Exception
> > > > >> >> {
> > > > >> >>> return paramIN1;
> > > > >> >>> }
> > > > >> >>> }).print();
> > > > >> >>>
> > > > >> >>> I perfectly get the elements that are matching in both the
> > > streams,
> > > > >> >> however
> > > > >> >>> my requirement is to write these matched elements and also the
> > > > >> unmatched
> > > > >> >>> elements to sink(S3)
> > > > >> >>>
> > > > >> >>> How do I get the unmatched elements from each stream ?
> > > > >> >>>
> > > > >> >>> Regards,
> > > > >> >>> Vinay Patil
> > > > >> >>>
> > > > >> >>
> > > > >> >>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Just an update, when I keep IngestionTime and remove the timestamp I am
generating, I am getting all the records, but for Event Time I am getting
one less record, I checked the Time Difference between two records, it is 3
min, I tried keeping the window time to 5 mins, but that even did not work.

Even when I try assigning timestamp for IngestionTime, I get one record
less, so should I safely use Ingestion Time or is it always advisable to
use EventTime ?

Regards,
Vinay Patil

On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <[hidden email]>
wrote:

> Hi ,
>
> Actually I am only publishing 5 messages each to two different kafka
> topics (using Junit), even if I keep the window to 500 seconds the result
> is same.
>
> I am not understanding why it is not sending the 5th element to co-group
> operator even when the keys are same.
>
> I actually cannot share the actual client code.
> But this is what the streams look like :
> sourceStream.coGroup(destStream)
> here the sourceStream and destStream is actually Tuple2<String,DTO> , and
> the ElementSelector returns tuple.f0 which is the key.
>
> I am generating the timestamp based on a field from the DTO which is
> guaranteed to be in order.
>
> Will using the triggers help here ?
>
>
> Regards,
> Vinay Patil
>
> *+91-800-728-4749*
>
> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi,
>> what timestamps are you assigning? Is it guaranteed that all of them would
>> fall into the same 30 second window?
>>
>> The issue with duplicate printing in the ElementSelector is strange? Could
>> you post a more complete code example so that I can reproduce the problem?
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]> wrote:
>>
>> > Hi ,
>> >
>> > I am able to get the matching and non-matching elements.
>> >
>> > However when I am unit testing the code , I am getting one record less
>> > inside the overriden cogroup function.
>> > Testing the following way :
>> >
>> > 1) Insert 5 messages into local kafka topic (test1)
>> > 2) Insert different 5 messages into local kafka topic (test2)
>> > 3) Consume 1) and 2) and I have two different kafka  streams
>> > 4) Generate ascending timestamp(using Event Time) for both streams and
>> > create key(String)
>> >
>> > Now till 4) I am able to get all the records (checked by printing the
>> > stream in text file)
>> >
>> > However when I send the stream to co-group operator, I am receiving one
>> > less record, using the following syntax:
>> >
>> > sourceStream.coGroup(destStream)
>> > .where(new ElementSelector())
>> > .equalTo(new ElementSelector())
>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > .apply(new JoinStreams);
>> >
>> > Also in the Element Selector I have inserted a sysout, I am getting 20
>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
>> >
>> > Unable to understand why one record is coming less to co-group
>> >
>> >
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]>
>> wrote:
>> >
>> > > Can you add a flag to each element emitted by the CoGroupFunction that
>> > > indicates whether it was joined or not?
>> > > Then you can use split to distinguish between both cases and handle
>> both
>> > > streams differently.
>> > >
>> > > Best, Fabian
>> > >
>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
>> > >
>> > > > Hi Jark,
>> > > >
>> > > > I am able to get the non-matching elements in a stream :,
>> > > >
>> > > > Of-course we can collect the matching elements in the same stream as
>> > > well,
>> > > > however I want to perform additional operations on the joined stream
>> > > before
>> > > > writing it to S3, so I would have to include a separate join
>> operator
>> > for
>> > > > the same two streams, right ?
>> > > > Correct me if I am wrong.
>> > > >
>> > > > I have pasted the dummy code which collects the non-matching
>> records (i
>> > > > have to perform this on the actual data, correct me if I am dong
>> > wrong).
>> > > >
>> > > > sourceStream.coGroup(destStream).where(new
>> > ElementSelector()).equalTo(new
>> > > > ElementSelector())
>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>> > > >
>> > > > private static final long serialVersionUID = 6408179761497497475L;
>> > > >
>> > > > @Override
>> > > > public void coGroup(Iterable<Integer> paramIterable,
>> Iterable<Integer>
>> > > > paramIterable1,
>> > > > Collector<Integer> paramCollector) throws Exception {
>> > > > long exactSizeIfKnown =
>> > > paramIterable.spliterator().getExactSizeIfKnown();
>> > > > long exactSizeIfKnown2 =
>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
>> > > > if(exactSizeIfKnown == 0 ) {
>> > > > paramCollector.collect(paramIterable1.iterator().next());
>> > > > } else if (exactSizeIfKnown2 == 0) {
>> > > > paramCollector.collect(paramIterable.iterator().next());
>> > > > }
>> > > > }
>> > > > }).print();
>> > > >
>> > > > Regards,
>> > > > Vinay Patil
>> > > >
>> > > >
>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
>> [hidden email]>
>> > > > wrote:
>> > > >
>> > > > > You are right, debugged it for all elements , I can do that now.
>> > > > > Thanks a lot.
>> > > > >
>> > > > > Regards,
>> > > > > Vinay Patil
>> > > > >
>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
>> > [hidden email]>
>> > > > > wrote:
>> > > > >
>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
>> > empty,
>> > > it
>> > > > >> means they are matched elements from both stream.
>> > > > >> When one of iter1 and iter2 is empty , it means that they are
>> > > unmatched.
>> > > > >>
>> > > > >>
>> > > > >> - Jark Wu (wuchong)
>> > > > >>
>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]> 写道:
>> > > > >> >
>> > > > >> > Hi Matthias ,
>> > > > >> >
>> > > > >> > I did not get you, even if we use Co-Group we have to apply it
>> on
>> > a
>> > > > key
>> > > > >> >
>> > > > >> > sourceStream.coGroup(destStream)
>> > > > >> > .where(new ElementSelector())
>> > > > >> > .equalTo(new ElementSelector())
>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>> > > > >> > private static final long serialVersionUID =
>> 6408179761497497475L;
>> > > > >> >
>> > > > >> > @Override
>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
>> > > Iterable<Integer>
>> > > > >> > paramIterable1,
>> > > > >> > Collector<Integer> paramCollector) throws Exception {
>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
>> > > > >> > while(iterator.hasNext()) {
>> > > > >> > }
>> > > > >> > }
>> > > > >> > });
>> > > > >> >
>> > > > >> > when I debug this ,only the matched element from both stream
>> will
>> > > come
>> > > > >> in
>> > > > >> > the coGroup function.
>> > > > >> >
>> > > > >> > What I want is how do I check for unmatched elements from both
>> > > streams
>> > > > >> and
>> > > > >> > write it to sink.
>> > > > >> >
>> > > > >> > Regards,
>> > > > >> > Vinay Patil
>> > > > >> >
>> > > > >> > *+91-800-728-4749*
>> > > > >> >
>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
>> > [hidden email]>
>> > > > >> wrote:
>> > > > >> >
>> > > > >> >> You need to do an outer-join. However, there is no build-in
>> > support
>> > > > for
>> > > > >> >> outer-joins yet.
>> > > > >> >>
>> > > > >> >> You can use Window-CoGroup to implement the outer-join as an
>> own
>> > > > >> operator.
>> > > > >> >>
>> > > > >> >>
>> > > > >> >> -Matthias
>> > > > >> >>
>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>> > > > >> >>> Hi,
>> > > > >> >>>
>> > > > >> >>> I have a question regarding the join operation, consider the
>> > > > following
>> > > > >> >>> dummy example:
>> > > > >> >>>
>> > > > >> >>> StreamExecutionEnvironment env =
>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>> > > > >> >>>
>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> > > > >> >>> DataStreamSource<Integer> sourceStream =
>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
>> > > > >> >>> DataStreamSource<Integer> destStream =
>> > > > >> >> env.fromElements(20,30,40,50,60,10);
>> > > > >> >>>
>> > > > >> >>> sourceStream.join(destStream)
>> > > > >> >>> .where(new ElementSelector())
>> > > > >> >>> .equalTo(new ElementSelector())
>> > > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>> > > > >> >>>
>> > > > >> >>> private static final long serialVersionUID = 1L;
>> > > > >> >>>
>> > > > >> >>> @Override
>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2)
>> throws
>> > > > >> Exception
>> > > > >> >> {
>> > > > >> >>> return paramIN1;
>> > > > >> >>> }
>> > > > >> >>> }).print();
>> > > > >> >>>
>> > > > >> >>> I perfectly get the elements that are matching in both the
>> > > streams,
>> > > > >> >> however
>> > > > >> >>> my requirement is to write these matched elements and also
>> the
>> > > > >> unmatched
>> > > > >> >>> elements to sink(S3)
>> > > > >> >>>
>> > > > >> >>> How do I get the unmatched elements from each stream ?
>> > > > >> >>>
>> > > > >> >>> Regards,
>> > > > >> >>> Vinay Patil
>> > > > >> >>>
>> > > > >> >>
>> > > > >> >>
>> > > > >>
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi,

Following is the timestamp I am getting from DTO, here is the timestamp
difference between the two records :
1466115892162154279
1466116026233613409

So the time difference is roughly 3 min, even if I apply the window of 5min
, I am not getting the last record (last timestamp value above),
using ascending timestamp extractor for generating the timestamp (assuming
that the timestamp are always in order)

I was at-least expecting data to reach the co-group function.
What could be the reason for the data loss ? The data we are getting is
critical, hence we cannot afford to loose any data


Regards,
Vinay Patil

On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <[hidden email]>
wrote:

> Just an update, when I keep IngestionTime and remove the timestamp I am
> generating, I am getting all the records, but for Event Time I am getting
> one less record, I checked the Time Difference between two records, it is 3
> min, I tried keeping the window time to 5 mins, but that even did not work.
>
> Even when I try assigning timestamp for IngestionTime, I get one record
> less, so should I safely use Ingestion Time or is it always advisable to
> use EventTime ?
>
> Regards,
> Vinay Patil
>
> On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <[hidden email]>
> wrote:
>
>> Hi ,
>>
>> Actually I am only publishing 5 messages each to two different kafka
>> topics (using Junit), even if I keep the window to 500 seconds the result
>> is same.
>>
>> I am not understanding why it is not sending the 5th element to co-group
>> operator even when the keys are same.
>>
>> I actually cannot share the actual client code.
>> But this is what the streams look like :
>> sourceStream.coGroup(destStream)
>> here the sourceStream and destStream is actually Tuple2<String,DTO> , and
>> the ElementSelector returns tuple.f0 which is the key.
>>
>> I am generating the timestamp based on a field from the DTO which is
>> guaranteed to be in order.
>>
>> Will using the triggers help here ?
>>
>>
>> Regards,
>> Vinay Patil
>>
>> *+91-800-728-4749*
>>
>> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Hi,
>>> what timestamps are you assigning? Is it guaranteed that all of them
>>> would
>>> fall into the same 30 second window?
>>>
>>> The issue with duplicate printing in the ElementSelector is strange?
>>> Could
>>> you post a more complete code example so that I can reproduce the
>>> problem?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]>
>>> wrote:
>>>
>>> > Hi ,
>>> >
>>> > I am able to get the matching and non-matching elements.
>>> >
>>> > However when I am unit testing the code , I am getting one record less
>>> > inside the overriden cogroup function.
>>> > Testing the following way :
>>> >
>>> > 1) Insert 5 messages into local kafka topic (test1)
>>> > 2) Insert different 5 messages into local kafka topic (test2)
>>> > 3) Consume 1) and 2) and I have two different kafka  streams
>>> > 4) Generate ascending timestamp(using Event Time) for both streams and
>>> > create key(String)
>>> >
>>> > Now till 4) I am able to get all the records (checked by printing the
>>> > stream in text file)
>>> >
>>> > However when I send the stream to co-group operator, I am receiving one
>>> > less record, using the following syntax:
>>> >
>>> > sourceStream.coGroup(destStream)
>>> > .where(new ElementSelector())
>>> > .equalTo(new ElementSelector())
>>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > .apply(new JoinStreams);
>>> >
>>> > Also in the Element Selector I have inserted a sysout, I am getting 20
>>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
>>> >
>>> > Unable to understand why one record is coming less to co-group
>>> >
>>> >
>>> >
>>> > Regards,
>>> > Vinay Patil
>>> >
>>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]>
>>> wrote:
>>> >
>>> > > Can you add a flag to each element emitted by the CoGroupFunction
>>> that
>>> > > indicates whether it was joined or not?
>>> > > Then you can use split to distinguish between both cases and handle
>>> both
>>> > > streams differently.
>>> > >
>>> > > Best, Fabian
>>> > >
>>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
>>> > >
>>> > > > Hi Jark,
>>> > > >
>>> > > > I am able to get the non-matching elements in a stream :,
>>> > > >
>>> > > > Of-course we can collect the matching elements in the same stream
>>> as
>>> > > well,
>>> > > > however I want to perform additional operations on the joined
>>> stream
>>> > > before
>>> > > > writing it to S3, so I would have to include a separate join
>>> operator
>>> > for
>>> > > > the same two streams, right ?
>>> > > > Correct me if I am wrong.
>>> > > >
>>> > > > I have pasted the dummy code which collects the non-matching
>>> records (i
>>> > > > have to perform this on the actual data, correct me if I am dong
>>> > wrong).
>>> > > >
>>> > > > sourceStream.coGroup(destStream).where(new
>>> > ElementSelector()).equalTo(new
>>> > > > ElementSelector())
>>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>>> > > >
>>> > > > private static final long serialVersionUID = 6408179761497497475L;
>>> > > >
>>> > > > @Override
>>> > > > public void coGroup(Iterable<Integer> paramIterable,
>>> Iterable<Integer>
>>> > > > paramIterable1,
>>> > > > Collector<Integer> paramCollector) throws Exception {
>>> > > > long exactSizeIfKnown =
>>> > > paramIterable.spliterator().getExactSizeIfKnown();
>>> > > > long exactSizeIfKnown2 =
>>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
>>> > > > if(exactSizeIfKnown == 0 ) {
>>> > > > paramCollector.collect(paramIterable1.iterator().next());
>>> > > > } else if (exactSizeIfKnown2 == 0) {
>>> > > > paramCollector.collect(paramIterable.iterator().next());
>>> > > > }
>>> > > > }
>>> > > > }).print();
>>> > > >
>>> > > > Regards,
>>> > > > Vinay Patil
>>> > > >
>>> > > >
>>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
>>> [hidden email]>
>>> > > > wrote:
>>> > > >
>>> > > > > You are right, debugged it for all elements , I can do that now.
>>> > > > > Thanks a lot.
>>> > > > >
>>> > > > > Regards,
>>> > > > > Vinay Patil
>>> > > > >
>>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
>>> > [hidden email]>
>>> > > > > wrote:
>>> > > > >
>>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
>>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
>>> > empty,
>>> > > it
>>> > > > >> means they are matched elements from both stream.
>>> > > > >> When one of iter1 and iter2 is empty , it means that they are
>>> > > unmatched.
>>> > > > >>
>>> > > > >>
>>> > > > >> - Jark Wu (wuchong)
>>> > > > >>
>>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]>
>>> 写道:
>>> > > > >> >
>>> > > > >> > Hi Matthias ,
>>> > > > >> >
>>> > > > >> > I did not get you, even if we use Co-Group we have to apply
>>> it on
>>> > a
>>> > > > key
>>> > > > >> >
>>> > > > >> > sourceStream.coGroup(destStream)
>>> > > > >> > .where(new ElementSelector())
>>> > > > >> > .equalTo(new ElementSelector())
>>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
>>> > > > >> > private static final long serialVersionUID =
>>> 6408179761497497475L;
>>> > > > >> >
>>> > > > >> > @Override
>>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
>>> > > Iterable<Integer>
>>> > > > >> > paramIterable1,
>>> > > > >> > Collector<Integer> paramCollector) throws Exception {
>>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
>>> > > > >> > while(iterator.hasNext()) {
>>> > > > >> > }
>>> > > > >> > }
>>> > > > >> > });
>>> > > > >> >
>>> > > > >> > when I debug this ,only the matched element from both stream
>>> will
>>> > > come
>>> > > > >> in
>>> > > > >> > the coGroup function.
>>> > > > >> >
>>> > > > >> > What I want is how do I check for unmatched elements from both
>>> > > streams
>>> > > > >> and
>>> > > > >> > write it to sink.
>>> > > > >> >
>>> > > > >> > Regards,
>>> > > > >> > Vinay Patil
>>> > > > >> >
>>> > > > >> > *+91-800-728-4749*
>>> > > > >> >
>>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
>>> > [hidden email]>
>>> > > > >> wrote:
>>> > > > >> >
>>> > > > >> >> You need to do an outer-join. However, there is no build-in
>>> > support
>>> > > > for
>>> > > > >> >> outer-joins yet.
>>> > > > >> >>
>>> > > > >> >> You can use Window-CoGroup to implement the outer-join as an
>>> own
>>> > > > >> operator.
>>> > > > >> >>
>>> > > > >> >>
>>> > > > >> >> -Matthias
>>> > > > >> >>
>>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
>>> > > > >> >>> Hi,
>>> > > > >> >>>
>>> > > > >> >>> I have a question regarding the join operation, consider the
>>> > > > following
>>> > > > >> >>> dummy example:
>>> > > > >> >>>
>>> > > > >> >>> StreamExecutionEnvironment env =
>>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> > > > >> >>>
>>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> > > > >> >>> DataStreamSource<Integer> sourceStream =
>>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
>>> > > > >> >>> DataStreamSource<Integer> destStream =
>>> > > > >> >> env.fromElements(20,30,40,50,60,10);
>>> > > > >> >>>
>>> > > > >> >>> sourceStream.join(destStream)
>>> > > > >> >>> .where(new ElementSelector())
>>> > > > >> >>> .equalTo(new ElementSelector())
>>> > > > >> >>> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
>>> > > > >> >>>
>>> > > > >> >>> private static final long serialVersionUID = 1L;
>>> > > > >> >>>
>>> > > > >> >>> @Override
>>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2)
>>> throws
>>> > > > >> Exception
>>> > > > >> >> {
>>> > > > >> >>> return paramIN1;
>>> > > > >> >>> }
>>> > > > >> >>> }).print();
>>> > > > >> >>>
>>> > > > >> >>> I perfectly get the elements that are matching in both the
>>> > > streams,
>>> > > > >> >> however
>>> > > > >> >>> my requirement is to write these matched elements and also
>>> the
>>> > > > >> unmatched
>>> > > > >> >>> elements to sink(S3)
>>> > > > >> >>>
>>> > > > >> >>> How do I get the unmatched elements from each stream ?
>>> > > > >> >>>
>>> > > > >> >>> Regards,
>>> > > > >> >>> Vinay Patil
>>> > > > >> >>>
>>> > > > >> >>
>>> > > > >> >>
>>> > > > >>
>>> > > > >>
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Aljoscha Krettek-2
Hi,
first regarding tumbling windows: even if you have 5 minute windows it can
happen that elements that are only seconds apart go into different windows.
Consider the following case:

|                x | x                 |

These are two 5-mintue windows and the two elements are only seconds apart
but go into different windows because windows are aligned to epoch.

Now, for the ascending timestamp extractor. The reason this can behave in
unexpected ways is that it emits a watermark that is "last timestamp - 1",
i.e. if it has seen timestamp t it can only emit watermark t-1 because
there might be other elements with timestamp t arriving. If you have a
continuous stream of elements you wouldn't notice this. Only in this
constructed example does it become visible.

Cheers,
Aljoscha

On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]> wrote:

> Hi,
>
> Following is the timestamp I am getting from DTO, here is the timestamp
> difference between the two records :
> 1466115892162154279
> 1466116026233613409
>
> So the time difference is roughly 3 min, even if I apply the window of 5min
> , I am not getting the last record (last timestamp value above),
> using ascending timestamp extractor for generating the timestamp (assuming
> that the timestamp are always in order)
>
> I was at-least expecting data to reach the co-group function.
> What could be the reason for the data loss ? The data we are getting is
> critical, hence we cannot afford to loose any data
>
>
> Regards,
> Vinay Patil
>
> On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <[hidden email]>
> wrote:
>
> > Just an update, when I keep IngestionTime and remove the timestamp I am
> > generating, I am getting all the records, but for Event Time I am getting
> > one less record, I checked the Time Difference between two records, it
> is 3
> > min, I tried keeping the window time to 5 mins, but that even did not
> work.
> >
> > Even when I try assigning timestamp for IngestionTime, I get one record
> > less, so should I safely use Ingestion Time or is it always advisable to
> > use EventTime ?
> >
> > Regards,
> > Vinay Patil
> >
> > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <[hidden email]>
> > wrote:
> >
> >> Hi ,
> >>
> >> Actually I am only publishing 5 messages each to two different kafka
> >> topics (using Junit), even if I keep the window to 500 seconds the
> result
> >> is same.
> >>
> >> I am not understanding why it is not sending the 5th element to co-group
> >> operator even when the keys are same.
> >>
> >> I actually cannot share the actual client code.
> >> But this is what the streams look like :
> >> sourceStream.coGroup(destStream)
> >> here the sourceStream and destStream is actually Tuple2<String,DTO> ,
> and
> >> the ElementSelector returns tuple.f0 which is the key.
> >>
> >> I am generating the timestamp based on a field from the DTO which is
> >> guaranteed to be in order.
> >>
> >> Will using the triggers help here ?
> >>
> >>
> >> Regards,
> >> Vinay Patil
> >>
> >> *+91-800-728-4749*
> >>
> >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >>> Hi,
> >>> what timestamps are you assigning? Is it guaranteed that all of them
> >>> would
> >>> fall into the same 30 second window?
> >>>
> >>> The issue with duplicate printing in the ElementSelector is strange?
> >>> Could
> >>> you post a more complete code example so that I can reproduce the
> >>> problem?
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]>
> >>> wrote:
> >>>
> >>> > Hi ,
> >>> >
> >>> > I am able to get the matching and non-matching elements.
> >>> >
> >>> > However when I am unit testing the code , I am getting one record
> less
> >>> > inside the overriden cogroup function.
> >>> > Testing the following way :
> >>> >
> >>> > 1) Insert 5 messages into local kafka topic (test1)
> >>> > 2) Insert different 5 messages into local kafka topic (test2)
> >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> >>> > 4) Generate ascending timestamp(using Event Time) for both streams
> and
> >>> > create key(String)
> >>> >
> >>> > Now till 4) I am able to get all the records (checked by printing the
> >>> > stream in text file)
> >>> >
> >>> > However when I send the stream to co-group operator, I am receiving
> one
> >>> > less record, using the following syntax:
> >>> >
> >>> > sourceStream.coGroup(destStream)
> >>> > .where(new ElementSelector())
> >>> > .equalTo(new ElementSelector())
> >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > .apply(new JoinStreams);
> >>> >
> >>> > Also in the Element Selector I have inserted a sysout, I am getting
> 20
> >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest stream)
> >>> >
> >>> > Unable to understand why one record is coming less to co-group
> >>> >
> >>> >
> >>> >
> >>> > Regards,
> >>> > Vinay Patil
> >>> >
> >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]>
> >>> wrote:
> >>> >
> >>> > > Can you add a flag to each element emitted by the CoGroupFunction
> >>> that
> >>> > > indicates whether it was joined or not?
> >>> > > Then you can use split to distinguish between both cases and handle
> >>> both
> >>> > > streams differently.
> >>> > >
> >>> > > Best, Fabian
> >>> > >
> >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
> >>> > >
> >>> > > > Hi Jark,
> >>> > > >
> >>> > > > I am able to get the non-matching elements in a stream :,
> >>> > > >
> >>> > > > Of-course we can collect the matching elements in the same stream
> >>> as
> >>> > > well,
> >>> > > > however I want to perform additional operations on the joined
> >>> stream
> >>> > > before
> >>> > > > writing it to S3, so I would have to include a separate join
> >>> operator
> >>> > for
> >>> > > > the same two streams, right ?
> >>> > > > Correct me if I am wrong.
> >>> > > >
> >>> > > > I have pasted the dummy code which collects the non-matching
> >>> records (i
> >>> > > > have to perform this on the actual data, correct me if I am dong
> >>> > wrong).
> >>> > > >
> >>> > > > sourceStream.coGroup(destStream).where(new
> >>> > ElementSelector()).equalTo(new
> >>> > > > ElementSelector())
> >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> >>> > > >
> >>> > > > private static final long serialVersionUID =
> 6408179761497497475L;
> >>> > > >
> >>> > > > @Override
> >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> >>> Iterable<Integer>
> >>> > > > paramIterable1,
> >>> > > > Collector<Integer> paramCollector) throws Exception {
> >>> > > > long exactSizeIfKnown =
> >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> >>> > > > long exactSizeIfKnown2 =
> >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> >>> > > > if(exactSizeIfKnown == 0 ) {
> >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> >>> > > > } else if (exactSizeIfKnown2 == 0) {
> >>> > > > paramCollector.collect(paramIterable.iterator().next());
> >>> > > > }
> >>> > > > }
> >>> > > > }).print();
> >>> > > >
> >>> > > > Regards,
> >>> > > > Vinay Patil
> >>> > > >
> >>> > > >
> >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> >>> [hidden email]>
> >>> > > > wrote:
> >>> > > >
> >>> > > > > You are right, debugged it for all elements , I can do that
> now.
> >>> > > > > Thanks a lot.
> >>> > > > >
> >>> > > > > Regards,
> >>> > > > > Vinay Patil
> >>> > > > >
> >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> >>> > [hidden email]>
> >>> > > > > wrote:
> >>> > > > >
> >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer> iter2,
> >>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are not
> >>> > empty,
> >>> > > it
> >>> > > > >> means they are matched elements from both stream.
> >>> > > > >> When one of iter1 and iter2 is empty , it means that they are
> >>> > > unmatched.
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> - Jark Wu (wuchong)
> >>> > > > >>
> >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]>
> >>> 写道:
> >>> > > > >> >
> >>> > > > >> > Hi Matthias ,
> >>> > > > >> >
> >>> > > > >> > I did not get you, even if we use Co-Group we have to apply
> >>> it on
> >>> > a
> >>> > > > key
> >>> > > > >> >
> >>> > > > >> > sourceStream.coGroup(destStream)
> >>> > > > >> > .where(new ElementSelector())
> >>> > > > >> > .equalTo(new ElementSelector())
> >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> >>> > > > >> > private static final long serialVersionUID =
> >>> 6408179761497497475L;
> >>> > > > >> >
> >>> > > > >> > @Override
> >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> >>> > > Iterable<Integer>
> >>> > > > >> > paramIterable1,
> >>> > > > >> > Collector<Integer> paramCollector) throws Exception {
> >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> >>> > > > >> > while(iterator.hasNext()) {
> >>> > > > >> > }
> >>> > > > >> > }
> >>> > > > >> > });
> >>> > > > >> >
> >>> > > > >> > when I debug this ,only the matched element from both stream
> >>> will
> >>> > > come
> >>> > > > >> in
> >>> > > > >> > the coGroup function.
> >>> > > > >> >
> >>> > > > >> > What I want is how do I check for unmatched elements from
> both
> >>> > > streams
> >>> > > > >> and
> >>> > > > >> > write it to sink.
> >>> > > > >> >
> >>> > > > >> > Regards,
> >>> > > > >> > Vinay Patil
> >>> > > > >> >
> >>> > > > >> > *+91-800-728-4749*
> >>> > > > >> >
> >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> >>> > [hidden email]>
> >>> > > > >> wrote:
> >>> > > > >> >
> >>> > > > >> >> You need to do an outer-join. However, there is no build-in
> >>> > support
> >>> > > > for
> >>> > > > >> >> outer-joins yet.
> >>> > > > >> >>
> >>> > > > >> >> You can use Window-CoGroup to implement the outer-join as
> an
> >>> own
> >>> > > > >> operator.
> >>> > > > >> >>
> >>> > > > >> >>
> >>> > > > >> >> -Matthias
> >>> > > > >> >>
> >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> >>> > > > >> >>> Hi,
> >>> > > > >> >>>
> >>> > > > >> >>> I have a question regarding the join operation, consider
> the
> >>> > > > following
> >>> > > > >> >>> dummy example:
> >>> > > > >> >>>
> >>> > > > >> >>> StreamExecutionEnvironment env =
> >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> >>> > > > >> >>>
> >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> >>> > > > >> >>> DataStreamSource<Integer> destStream =
> >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> >>> > > > >> >>>
> >>> > > > >> >>> sourceStream.join(destStream)
> >>> > > > >> >>> .where(new ElementSelector())
> >>> > > > >> >>> .equalTo(new ElementSelector())
> >>> > > > >> >>>
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> >>> > > > >> >>>
> >>> > > > >> >>> private static final long serialVersionUID = 1L;
> >>> > > > >> >>>
> >>> > > > >> >>> @Override
> >>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2)
> >>> throws
> >>> > > > >> Exception
> >>> > > > >> >> {
> >>> > > > >> >>> return paramIN1;
> >>> > > > >> >>> }
> >>> > > > >> >>> }).print();
> >>> > > > >> >>>
> >>> > > > >> >>> I perfectly get the elements that are matching in both the
> >>> > > streams,
> >>> > > > >> >> however
> >>> > > > >> >>> my requirement is to write these matched elements and also
> >>> the
> >>> > > > >> unmatched
> >>> > > > >> >>> elements to sink(S3)
> >>> > > > >> >>>
> >>> > > > >> >>> How do I get the unmatched elements from each stream ?
> >>> > > > >> >>>
> >>> > > > >> >>> Regards,
> >>> > > > >> >>> Vinay Patil
> >>> > > > >> >>>
> >>> > > > >> >>
> >>> > > > >> >>
> >>> > > > >>
> >>> > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi Aljoscha,

Thank you for your response.
So do you suggest to use different approach for extracting timestamp (as
given in document) instead of AscendingTimeStamp Extractor ?
Is that the reason I am seeing this unexpected behaviour ? in case of
continuous stream I would not see any data loss ?

Also assuming that the records are always going to be in order , which is
the best approach : Ingestion Time or Event Time ?



Regards,
Vinay Patil

On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> first regarding tumbling windows: even if you have 5 minute windows it can
> happen that elements that are only seconds apart go into different windows.
> Consider the following case:
>
> |                x | x                 |
>
> These are two 5-mintue windows and the two elements are only seconds apart
> but go into different windows because windows are aligned to epoch.
>
> Now, for the ascending timestamp extractor. The reason this can behave in
> unexpected ways is that it emits a watermark that is "last timestamp - 1",
> i.e. if it has seen timestamp t it can only emit watermark t-1 because
> there might be other elements with timestamp t arriving. If you have a
> continuous stream of elements you wouldn't notice this. Only in this
> constructed example does it become visible.
>
> Cheers,
> Aljoscha
>
> On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]> wrote:
>
> > Hi,
> >
> > Following is the timestamp I am getting from DTO, here is the timestamp
> > difference between the two records :
> > 1466115892162154279
> > 1466116026233613409
> >
> > So the time difference is roughly 3 min, even if I apply the window of
> 5min
> > , I am not getting the last record (last timestamp value above),
> > using ascending timestamp extractor for generating the timestamp
> (assuming
> > that the timestamp are always in order)
> >
> > I was at-least expecting data to reach the co-group function.
> > What could be the reason for the data loss ? The data we are getting is
> > critical, hence we cannot afford to loose any data
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <[hidden email]>
> > wrote:
> >
> > > Just an update, when I keep IngestionTime and remove the timestamp I am
> > > generating, I am getting all the records, but for Event Time I am
> getting
> > > one less record, I checked the Time Difference between two records, it
> > is 3
> > > min, I tried keeping the window time to 5 mins, but that even did not
> > work.
> > >
> > > Even when I try assigning timestamp for IngestionTime, I get one record
> > > less, so should I safely use Ingestion Time or is it always advisable
> to
> > > use EventTime ?
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <[hidden email]>
> > > wrote:
> > >
> > >> Hi ,
> > >>
> > >> Actually I am only publishing 5 messages each to two different kafka
> > >> topics (using Junit), even if I keep the window to 500 seconds the
> > result
> > >> is same.
> > >>
> > >> I am not understanding why it is not sending the 5th element to
> co-group
> > >> operator even when the keys are same.
> > >>
> > >> I actually cannot share the actual client code.
> > >> But this is what the streams look like :
> > >> sourceStream.coGroup(destStream)
> > >> here the sourceStream and destStream is actually Tuple2<String,DTO> ,
> > and
> > >> the ElementSelector returns tuple.f0 which is the key.
> > >>
> > >> I am generating the timestamp based on a field from the DTO which is
> > >> guaranteed to be in order.
> > >>
> > >> Will using the triggers help here ?
> > >>
> > >>
> > >> Regards,
> > >> Vinay Patil
> > >>
> > >> *+91-800-728-4749*
> > >>
> > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> [hidden email]>
> > >> wrote:
> > >>
> > >>> Hi,
> > >>> what timestamps are you assigning? Is it guaranteed that all of them
> > >>> would
> > >>> fall into the same 30 second window?
> > >>>
> > >>> The issue with duplicate printing in the ElementSelector is strange?
> > >>> Could
> > >>> you post a more complete code example so that I can reproduce the
> > >>> problem?
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]>
> > >>> wrote:
> > >>>
> > >>> > Hi ,
> > >>> >
> > >>> > I am able to get the matching and non-matching elements.
> > >>> >
> > >>> > However when I am unit testing the code , I am getting one record
> > less
> > >>> > inside the overriden cogroup function.
> > >>> > Testing the following way :
> > >>> >
> > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > >>> > 4) Generate ascending timestamp(using Event Time) for both streams
> > and
> > >>> > create key(String)
> > >>> >
> > >>> > Now till 4) I am able to get all the records (checked by printing
> the
> > >>> > stream in text file)
> > >>> >
> > >>> > However when I send the stream to co-group operator, I am receiving
> > one
> > >>> > less record, using the following syntax:
> > >>> >
> > >>> > sourceStream.coGroup(destStream)
> > >>> > .where(new ElementSelector())
> > >>> > .equalTo(new ElementSelector())
> > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > .apply(new JoinStreams);
> > >>> >
> > >>> > Also in the Element Selector I have inserted a sysout, I am getting
> > 20
> > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> stream)
> > >>> >
> > >>> > Unable to understand why one record is coming less to co-group
> > >>> >
> > >>> >
> > >>> >
> > >>> > Regards,
> > >>> > Vinay Patil
> > >>> >
> > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <[hidden email]>
> > >>> wrote:
> > >>> >
> > >>> > > Can you add a flag to each element emitted by the CoGroupFunction
> > >>> that
> > >>> > > indicates whether it was joined or not?
> > >>> > > Then you can use split to distinguish between both cases and
> handle
> > >>> both
> > >>> > > streams differently.
> > >>> > >
> > >>> > > Best, Fabian
> > >>> > >
> > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]>:
> > >>> > >
> > >>> > > > Hi Jark,
> > >>> > > >
> > >>> > > > I am able to get the non-matching elements in a stream :,
> > >>> > > >
> > >>> > > > Of-course we can collect the matching elements in the same
> stream
> > >>> as
> > >>> > > well,
> > >>> > > > however I want to perform additional operations on the joined
> > >>> stream
> > >>> > > before
> > >>> > > > writing it to S3, so I would have to include a separate join
> > >>> operator
> > >>> > for
> > >>> > > > the same two streams, right ?
> > >>> > > > Correct me if I am wrong.
> > >>> > > >
> > >>> > > > I have pasted the dummy code which collects the non-matching
> > >>> records (i
> > >>> > > > have to perform this on the actual data, correct me if I am
> dong
> > >>> > wrong).
> > >>> > > >
> > >>> > > > sourceStream.coGroup(destStream).where(new
> > >>> > ElementSelector()).equalTo(new
> > >>> > > > ElementSelector())
> > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >>> > > >
> > >>> > > > private static final long serialVersionUID =
> > 6408179761497497475L;
> > >>> > > >
> > >>> > > > @Override
> > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > >>> Iterable<Integer>
> > >>> > > > paramIterable1,
> > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > >>> > > > long exactSizeIfKnown =
> > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > >>> > > > long exactSizeIfKnown2 =
> > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > >>> > > > if(exactSizeIfKnown == 0 ) {
> > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > >>> > > > }
> > >>> > > > }
> > >>> > > > }).print();
> > >>> > > >
> > >>> > > > Regards,
> > >>> > > > Vinay Patil
> > >>> > > >
> > >>> > > >
> > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > >>> [hidden email]>
> > >>> > > > wrote:
> > >>> > > >
> > >>> > > > > You are right, debugged it for all elements , I can do that
> > now.
> > >>> > > > > Thanks a lot.
> > >>> > > > >
> > >>> > > > > Regards,
> > >>> > > > > Vinay Patil
> > >>> > > > >
> > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > >>> > [hidden email]>
> > >>> > > > > wrote:
> > >>> > > > >
> > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer>
> iter2,
> > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are
> not
> > >>> > empty,
> > >>> > > it
> > >>> > > > >> means they are matched elements from both stream.
> > >>> > > > >> When one of iter1 and iter2 is empty , it means that they
> are
> > >>> > > unmatched.
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >> - Jark Wu (wuchong)
> > >>> > > > >>
> > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <[hidden email]
> >
> > >>> 写道:
> > >>> > > > >> >
> > >>> > > > >> > Hi Matthias ,
> > >>> > > > >> >
> > >>> > > > >> > I did not get you, even if we use Co-Group we have to
> apply
> > >>> it on
> > >>> > a
> > >>> > > > key
> > >>> > > > >> >
> > >>> > > > >> > sourceStream.coGroup(destStream)
> > >>> > > > >> > .where(new ElementSelector())
> > >>> > > > >> > .equalTo(new ElementSelector())
> > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > >>> > > > >> > private static final long serialVersionUID =
> > >>> 6408179761497497475L;
> > >>> > > > >> >
> > >>> > > > >> > @Override
> > >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > >>> > > Iterable<Integer>
> > >>> > > > >> > paramIterable1,
> > >>> > > > >> > Collector<Integer> paramCollector) throws Exception {
> > >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > >>> > > > >> > while(iterator.hasNext()) {
> > >>> > > > >> > }
> > >>> > > > >> > }
> > >>> > > > >> > });
> > >>> > > > >> >
> > >>> > > > >> > when I debug this ,only the matched element from both
> stream
> > >>> will
> > >>> > > come
> > >>> > > > >> in
> > >>> > > > >> > the coGroup function.
> > >>> > > > >> >
> > >>> > > > >> > What I want is how do I check for unmatched elements from
> > both
> > >>> > > streams
> > >>> > > > >> and
> > >>> > > > >> > write it to sink.
> > >>> > > > >> >
> > >>> > > > >> > Regards,
> > >>> > > > >> > Vinay Patil
> > >>> > > > >> >
> > >>> > > > >> > *+91-800-728-4749*
> > >>> > > > >> >
> > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > >>> > [hidden email]>
> > >>> > > > >> wrote:
> > >>> > > > >> >
> > >>> > > > >> >> You need to do an outer-join. However, there is no
> build-in
> > >>> > support
> > >>> > > > for
> > >>> > > > >> >> outer-joins yet.
> > >>> > > > >> >>
> > >>> > > > >> >> You can use Window-CoGroup to implement the outer-join as
> > an
> > >>> own
> > >>> > > > >> operator.
> > >>> > > > >> >>
> > >>> > > > >> >>
> > >>> > > > >> >> -Matthias
> > >>> > > > >> >>
> > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > >>> > > > >> >>> Hi,
> > >>> > > > >> >>>
> > >>> > > > >> >>> I have a question regarding the join operation, consider
> > the
> > >>> > > > following
> > >>> > > > >> >>> dummy example:
> > >>> > > > >> >>>
> > >>> > > > >> >>> StreamExecutionEnvironment env =
> > >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > >>> > > > >> >>>
> > >>> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > >>> > > > >> >>>
> > >>> > > > >> >>> sourceStream.join(destStream)
> > >>> > > > >> >>> .where(new ElementSelector())
> > >>> > > > >> >>> .equalTo(new ElementSelector())
> > >>> > > > >> >>>
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > >>> > > > >> >>>
> > >>> > > > >> >>> private static final long serialVersionUID = 1L;
> > >>> > > > >> >>>
> > >>> > > > >> >>> @Override
> > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer paramIN2)
> > >>> throws
> > >>> > > > >> Exception
> > >>> > > > >> >> {
> > >>> > > > >> >>> return paramIN1;
> > >>> > > > >> >>> }
> > >>> > > > >> >>> }).print();
> > >>> > > > >> >>>
> > >>> > > > >> >>> I perfectly get the elements that are matching in both
> the
> > >>> > > streams,
> > >>> > > > >> >> however
> > >>> > > > >> >>> my requirement is to write these matched elements and
> also
> > >>> the
> > >>> > > > >> unmatched
> > >>> > > > >> >>> elements to sink(S3)
> > >>> > > > >> >>>
> > >>> > > > >> >>> How do I get the unmatched elements from each stream ?
> > >>> > > > >> >>>
> > >>> > > > >> >>> Regards,
> > >>> > > > >> >>> Vinay Patil
> > >>> > > > >> >>>
> > >>> > > > >> >>
> > >>> > > > >> >>
> > >>> > > > >>
> > >>> > > > >>
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Aljoscha Krettek-2
Hi,
ingestion time can only be used if you don't care about the timestamp in
the elements. So if you have those you should probably use event time.

If your timestamps really are strictly increasing then the ascending
extractor is good. And if you have a continuous stream of incoming elements
you will not see the behavior of not getting the last elements.

By the way, when using Kafka you can also embed the timestamp extractor
directly in the Kafka consumer. This is described here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

Cheers,
Aljoscha

On Tue, 28 Jun 2016 at 11:44 Vinay Patil <[hidden email]> wrote:

> Hi Aljoscha,
>
> Thank you for your response.
> So do you suggest to use different approach for extracting timestamp (as
> given in document) instead of AscendingTimeStamp Extractor ?
> Is that the reason I am seeing this unexpected behaviour ? in case of
> continuous stream I would not see any data loss ?
>
> Also assuming that the records are always going to be in order , which is
> the best approach : Ingestion Time or Event Time ?
>
>
>
> Regards,
> Vinay Patil
>
> On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > first regarding tumbling windows: even if you have 5 minute windows it
> can
> > happen that elements that are only seconds apart go into different
> windows.
> > Consider the following case:
> >
> > |                x | x                 |
> >
> > These are two 5-mintue windows and the two elements are only seconds
> apart
> > but go into different windows because windows are aligned to epoch.
> >
> > Now, for the ascending timestamp extractor. The reason this can behave in
> > unexpected ways is that it emits a watermark that is "last timestamp -
> 1",
> > i.e. if it has seen timestamp t it can only emit watermark t-1 because
> > there might be other elements with timestamp t arriving. If you have a
> > continuous stream of elements you wouldn't notice this. Only in this
> > constructed example does it become visible.
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]>
> wrote:
> >
> > > Hi,
> > >
> > > Following is the timestamp I am getting from DTO, here is the timestamp
> > > difference between the two records :
> > > 1466115892162154279
> > > 1466116026233613409
> > >
> > > So the time difference is roughly 3 min, even if I apply the window of
> > 5min
> > > , I am not getting the last record (last timestamp value above),
> > > using ascending timestamp extractor for generating the timestamp
> > (assuming
> > > that the timestamp are always in order)
> > >
> > > I was at-least expecting data to reach the co-group function.
> > > What could be the reason for the data loss ? The data we are getting is
> > > critical, hence we cannot afford to loose any data
> > >
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <[hidden email]
> >
> > > wrote:
> > >
> > > > Just an update, when I keep IngestionTime and remove the timestamp I
> am
> > > > generating, I am getting all the records, but for Event Time I am
> > getting
> > > > one less record, I checked the Time Difference between two records,
> it
> > > is 3
> > > > min, I tried keeping the window time to 5 mins, but that even did not
> > > work.
> > > >
> > > > Even when I try assigning timestamp for IngestionTime, I get one
> record
> > > > less, so should I safely use Ingestion Time or is it always advisable
> > to
> > > > use EventTime ?
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> [hidden email]>
> > > > wrote:
> > > >
> > > >> Hi ,
> > > >>
> > > >> Actually I am only publishing 5 messages each to two different kafka
> > > >> topics (using Junit), even if I keep the window to 500 seconds the
> > > result
> > > >> is same.
> > > >>
> > > >> I am not understanding why it is not sending the 5th element to
> > co-group
> > > >> operator even when the keys are same.
> > > >>
> > > >> I actually cannot share the actual client code.
> > > >> But this is what the streams look like :
> > > >> sourceStream.coGroup(destStream)
> > > >> here the sourceStream and destStream is actually Tuple2<String,DTO>
> ,
> > > and
> > > >> the ElementSelector returns tuple.f0 which is the key.
> > > >>
> > > >> I am generating the timestamp based on a field from the DTO which is
> > > >> guaranteed to be in order.
> > > >>
> > > >> Will using the triggers help here ?
> > > >>
> > > >>
> > > >> Regards,
> > > >> Vinay Patil
> > > >>
> > > >> *+91-800-728-4749*
> > > >>
> > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > [hidden email]>
> > > >> wrote:
> > > >>
> > > >>> Hi,
> > > >>> what timestamps are you assigning? Is it guaranteed that all of
> them
> > > >>> would
> > > >>> fall into the same 30 second window?
> > > >>>
> > > >>> The issue with duplicate printing in the ElementSelector is
> strange?
> > > >>> Could
> > > >>> you post a more complete code example so that I can reproduce the
> > > >>> problem?
> > > >>>
> > > >>> Cheers,
> > > >>> Aljoscha
> > > >>>
> > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <[hidden email]>
> > > >>> wrote:
> > > >>>
> > > >>> > Hi ,
> > > >>> >
> > > >>> > I am able to get the matching and non-matching elements.
> > > >>> >
> > > >>> > However when I am unit testing the code , I am getting one record
> > > less
> > > >>> > inside the overriden cogroup function.
> > > >>> > Testing the following way :
> > > >>> >
> > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > > >>> > 4) Generate ascending timestamp(using Event Time) for both
> streams
> > > and
> > > >>> > create key(String)
> > > >>> >
> > > >>> > Now till 4) I am able to get all the records (checked by printing
> > the
> > > >>> > stream in text file)
> > > >>> >
> > > >>> > However when I send the stream to co-group operator, I am
> receiving
> > > one
> > > >>> > less record, using the following syntax:
> > > >>> >
> > > >>> > sourceStream.coGroup(destStream)
> > > >>> > .where(new ElementSelector())
> > > >>> > .equalTo(new ElementSelector())
> > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > .apply(new JoinStreams);
> > > >>> >
> > > >>> > Also in the Element Selector I have inserted a sysout, I am
> getting
> > > 20
> > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> > stream)
> > > >>> >
> > > >>> > Unable to understand why one record is coming less to co-group
> > > >>> >
> > > >>> >
> > > >>> >
> > > >>> > Regards,
> > > >>> > Vinay Patil
> > > >>> >
> > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> [hidden email]>
> > > >>> wrote:
> > > >>> >
> > > >>> > > Can you add a flag to each element emitted by the
> CoGroupFunction
> > > >>> that
> > > >>> > > indicates whether it was joined or not?
> > > >>> > > Then you can use split to distinguish between both cases and
> > handle
> > > >>> both
> > > >>> > > streams differently.
> > > >>> > >
> > > >>> > > Best, Fabian
> > > >>> > >
> > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <[hidden email]
> >:
> > > >>> > >
> > > >>> > > > Hi Jark,
> > > >>> > > >
> > > >>> > > > I am able to get the non-matching elements in a stream :,
> > > >>> > > >
> > > >>> > > > Of-course we can collect the matching elements in the same
> > stream
> > > >>> as
> > > >>> > > well,
> > > >>> > > > however I want to perform additional operations on the joined
> > > >>> stream
> > > >>> > > before
> > > >>> > > > writing it to S3, so I would have to include a separate join
> > > >>> operator
> > > >>> > for
> > > >>> > > > the same two streams, right ?
> > > >>> > > > Correct me if I am wrong.
> > > >>> > > >
> > > >>> > > > I have pasted the dummy code which collects the non-matching
> > > >>> records (i
> > > >>> > > > have to perform this on the actual data, correct me if I am
> > dong
> > > >>> > wrong).
> > > >>> > > >
> > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > >>> > ElementSelector()).equalTo(new
> > > >>> > > > ElementSelector())
> > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > >>> > > >
> > > >>> > > > private static final long serialVersionUID =
> > > 6408179761497497475L;
> > > >>> > > >
> > > >>> > > > @Override
> > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > >>> Iterable<Integer>
> > > >>> > > > paramIterable1,
> > > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > > >>> > > > long exactSizeIfKnown =
> > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > >>> > > > long exactSizeIfKnown2 =
> > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > > >>> > > > }
> > > >>> > > > }
> > > >>> > > > }).print();
> > > >>> > > >
> > > >>> > > > Regards,
> > > >>> > > > Vinay Patil
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > >>> [hidden email]>
> > > >>> > > > wrote:
> > > >>> > > >
> > > >>> > > > > You are right, debugged it for all elements , I can do that
> > > now.
> > > >>> > > > > Thanks a lot.
> > > >>> > > > >
> > > >>> > > > > Regards,
> > > >>> > > > > Vinay Patil
> > > >>> > > > >
> > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > >>> > [hidden email]>
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer>
> > iter2,
> > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2 are
> > not
> > > >>> > empty,
> > > >>> > > it
> > > >>> > > > >> means they are matched elements from both stream.
> > > >>> > > > >> When one of iter1 and iter2 is empty , it means that they
> > are
> > > >>> > > unmatched.
> > > >>> > > > >>
> > > >>> > > > >>
> > > >>> > > > >> - Jark Wu (wuchong)
> > > >>> > > > >>
> > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> [hidden email]
> > >
> > > >>> 写道:
> > > >>> > > > >> >
> > > >>> > > > >> > Hi Matthias ,
> > > >>> > > > >> >
> > > >>> > > > >> > I did not get you, even if we use Co-Group we have to
> > apply
> > > >>> it on
> > > >>> > a
> > > >>> > > > key
> > > >>> > > > >> >
> > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > >>> > > > >> > .where(new ElementSelector())
> > > >>> > > > >> > .equalTo(new ElementSelector())
> > > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer, Integer>()
> {
> > > >>> > > > >> > private static final long serialVersionUID =
> > > >>> 6408179761497497475L;
> > > >>> > > > >> >
> > > >>> > > > >> > @Override
> > > >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > > >>> > > Iterable<Integer>
> > > >>> > > > >> > paramIterable1,
> > > >>> > > > >> > Collector<Integer> paramCollector) throws Exception {
> > > >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > > >>> > > > >> > while(iterator.hasNext()) {
> > > >>> > > > >> > }
> > > >>> > > > >> > }
> > > >>> > > > >> > });
> > > >>> > > > >> >
> > > >>> > > > >> > when I debug this ,only the matched element from both
> > stream
> > > >>> will
> > > >>> > > come
> > > >>> > > > >> in
> > > >>> > > > >> > the coGroup function.
> > > >>> > > > >> >
> > > >>> > > > >> > What I want is how do I check for unmatched elements
> from
> > > both
> > > >>> > > streams
> > > >>> > > > >> and
> > > >>> > > > >> > write it to sink.
> > > >>> > > > >> >
> > > >>> > > > >> > Regards,
> > > >>> > > > >> > Vinay Patil
> > > >>> > > > >> >
> > > >>> > > > >> > *+91-800-728-4749*
> > > >>> > > > >> >
> > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > > >>> > [hidden email]>
> > > >>> > > > >> wrote:
> > > >>> > > > >> >
> > > >>> > > > >> >> You need to do an outer-join. However, there is no
> > build-in
> > > >>> > support
> > > >>> > > > for
> > > >>> > > > >> >> outer-joins yet.
> > > >>> > > > >> >>
> > > >>> > > > >> >> You can use Window-CoGroup to implement the outer-join
> as
> > > an
> > > >>> own
> > > >>> > > > >> operator.
> > > >>> > > > >> >>
> > > >>> > > > >> >>
> > > >>> > > > >> >> -Matthias
> > > >>> > > > >> >>
> > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > >>> > > > >> >>> Hi,
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> I have a question regarding the join operation,
> consider
> > > the
> > > >>> > > > following
> > > >>> > > > >> >>> dummy example:
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > >>> > > > >> >>> StreamExecutionEnvironment.getExecutionEnvironment();
> > > >>> > > > >> >>>
> > > >>> >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> sourceStream.join(destStream)
> > > >>> > > > >> >>> .where(new ElementSelector())
> > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > >>> > > > >> >>>
> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer, Integer>() {
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> private static final long serialVersionUID = 1L;
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> @Override
> > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer
> paramIN2)
> > > >>> throws
> > > >>> > > > >> Exception
> > > >>> > > > >> >> {
> > > >>> > > > >> >>> return paramIN1;
> > > >>> > > > >> >>> }
> > > >>> > > > >> >>> }).print();
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> I perfectly get the elements that are matching in both
> > the
> > > >>> > > streams,
> > > >>> > > > >> >> however
> > > >>> > > > >> >>> my requirement is to write these matched elements and
> > also
> > > >>> the
> > > >>> > > > >> unmatched
> > > >>> > > > >> >>> elements to sink(S3)
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> How do I get the unmatched elements from each stream ?
> > > >>> > > > >> >>>
> > > >>> > > > >> >>> Regards,
> > > >>> > > > >> >>> Vinay Patil
> > > >>> > > > >> >>>
> > > >>> > > > >> >>
> > > >>> > > > >> >>
> > > >>> > > > >>
> > > >>> > > > >>
> > > >>> > > > >
> > > >>> > > >
> > > >>> > >
> > > >>> >
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi Aljoscha,

Thanks a lot for your inputs.

I still did not get you when you say you will not face this issue in case
of continuous stream, lets consider the following example :
Assume that the stream runs continuously from Monday  to Friday, and on
Friday it stops after 5.00 PM , will I still face this issue ?

I am actually not able to understand how it will differ in real time
streams.

Regards,
Vinay Patil

On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> ingestion time can only be used if you don't care about the timestamp in
> the elements. So if you have those you should probably use event time.
>
> If your timestamps really are strictly increasing then the ascending
> extractor is good. And if you have a continuous stream of incoming elements
> you will not see the behavior of not getting the last elements.
>
> By the way, when using Kafka you can also embed the timestamp extractor
> directly in the Kafka consumer. This is described here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> Cheers,
> Aljoscha
>
> On Tue, 28 Jun 2016 at 11:44 Vinay Patil <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > Thank you for your response.
> > So do you suggest to use different approach for extracting timestamp (as
> > given in document) instead of AscendingTimeStamp Extractor ?
> > Is that the reason I am seeing this unexpected behaviour ? in case of
> > continuous stream I would not see any data loss ?
> >
> > Also assuming that the records are always going to be in order , which is
> > the best approach : Ingestion Time or Event Time ?
> >
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > first regarding tumbling windows: even if you have 5 minute windows it
> > can
> > > happen that elements that are only seconds apart go into different
> > windows.
> > > Consider the following case:
> > >
> > > |                x | x                 |
> > >
> > > These are two 5-mintue windows and the two elements are only seconds
> > apart
> > > but go into different windows because windows are aligned to epoch.
> > >
> > > Now, for the ascending timestamp extractor. The reason this can behave
> in
> > > unexpected ways is that it emits a watermark that is "last timestamp -
> > 1",
> > > i.e. if it has seen timestamp t it can only emit watermark t-1 because
> > > there might be other elements with timestamp t arriving. If you have a
> > > continuous stream of elements you wouldn't notice this. Only in this
> > > constructed example does it become visible.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > Following is the timestamp I am getting from DTO, here is the
> timestamp
> > > > difference between the two records :
> > > > 1466115892162154279
> > > > 1466116026233613409
> > > >
> > > > So the time difference is roughly 3 min, even if I apply the window
> of
> > > 5min
> > > > , I am not getting the last record (last timestamp value above),
> > > > using ascending timestamp extractor for generating the timestamp
> > > (assuming
> > > > that the timestamp are always in order)
> > > >
> > > > I was at-least expecting data to reach the co-group function.
> > > > What could be the reason for the data loss ? The data we are getting
> is
> > > > critical, hence we cannot afford to loose any data
> > > >
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> [hidden email]
> > >
> > > > wrote:
> > > >
> > > > > Just an update, when I keep IngestionTime and remove the timestamp
> I
> > am
> > > > > generating, I am getting all the records, but for Event Time I am
> > > getting
> > > > > one less record, I checked the Time Difference between two records,
> > it
> > > > is 3
> > > > > min, I tried keeping the window time to 5 mins, but that even did
> not
> > > > work.
> > > > >
> > > > > Even when I try assigning timestamp for IngestionTime, I get one
> > record
> > > > > less, so should I safely use Ingestion Time or is it always
> advisable
> > > to
> > > > > use EventTime ?
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > [hidden email]>
> > > > > wrote:
> > > > >
> > > > >> Hi ,
> > > > >>
> > > > >> Actually I am only publishing 5 messages each to two different
> kafka
> > > > >> topics (using Junit), even if I keep the window to 500 seconds the
> > > > result
> > > > >> is same.
> > > > >>
> > > > >> I am not understanding why it is not sending the 5th element to
> > > co-group
> > > > >> operator even when the keys are same.
> > > > >>
> > > > >> I actually cannot share the actual client code.
> > > > >> But this is what the streams look like :
> > > > >> sourceStream.coGroup(destStream)
> > > > >> here the sourceStream and destStream is actually
> Tuple2<String,DTO>
> > ,
> > > > and
> > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > >>
> > > > >> I am generating the timestamp based on a field from the DTO which
> is
> > > > >> guaranteed to be in order.
> > > > >>
> > > > >> Will using the triggers help here ?
> > > > >>
> > > > >>
> > > > >> Regards,
> > > > >> Vinay Patil
> > > > >>
> > > > >> *+91-800-728-4749*
> > > > >>
> > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > [hidden email]>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi,
> > > > >>> what timestamps are you assigning? Is it guaranteed that all of
> > them
> > > > >>> would
> > > > >>> fall into the same 30 second window?
> > > > >>>
> > > > >>> The issue with duplicate printing in the ElementSelector is
> > strange?
> > > > >>> Could
> > > > >>> you post a more complete code example so that I can reproduce the
> > > > >>> problem?
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Aljoscha
> > > > >>>
> > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> [hidden email]>
> > > > >>> wrote:
> > > > >>>
> > > > >>> > Hi ,
> > > > >>> >
> > > > >>> > I am able to get the matching and non-matching elements.
> > > > >>> >
> > > > >>> > However when I am unit testing the code , I am getting one
> record
> > > > less
> > > > >>> > inside the overriden cogroup function.
> > > > >>> > Testing the following way :
> > > > >>> >
> > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > > > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > > > >>> > 4) Generate ascending timestamp(using Event Time) for both
> > streams
> > > > and
> > > > >>> > create key(String)
> > > > >>> >
> > > > >>> > Now till 4) I am able to get all the records (checked by
> printing
> > > the
> > > > >>> > stream in text file)
> > > > >>> >
> > > > >>> > However when I send the stream to co-group operator, I am
> > receiving
> > > > one
> > > > >>> > less record, using the following syntax:
> > > > >>> >
> > > > >>> > sourceStream.coGroup(destStream)
> > > > >>> > .where(new ElementSelector())
> > > > >>> > .equalTo(new ElementSelector())
> > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > >>> > .apply(new JoinStreams);
> > > > >>> >
> > > > >>> > Also in the Element Selector I have inserted a sysout, I am
> > getting
> > > > 20
> > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> > > stream)
> > > > >>> >
> > > > >>> > Unable to understand why one record is coming less to co-group
> > > > >>> >
> > > > >>> >
> > > > >>> >
> > > > >>> > Regards,
> > > > >>> > Vinay Patil
> > > > >>> >
> > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > [hidden email]>
> > > > >>> wrote:
> > > > >>> >
> > > > >>> > > Can you add a flag to each element emitted by the
> > CoGroupFunction
> > > > >>> that
> > > > >>> > > indicates whether it was joined or not?
> > > > >>> > > Then you can use split to distinguish between both cases and
> > > handle
> > > > >>> both
> > > > >>> > > streams differently.
> > > > >>> > >
> > > > >>> > > Best, Fabian
> > > > >>> > >
> > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> [hidden email]
> > >:
> > > > >>> > >
> > > > >>> > > > Hi Jark,
> > > > >>> > > >
> > > > >>> > > > I am able to get the non-matching elements in a stream :,
> > > > >>> > > >
> > > > >>> > > > Of-course we can collect the matching elements in the same
> > > stream
> > > > >>> as
> > > > >>> > > well,
> > > > >>> > > > however I want to perform additional operations on the
> joined
> > > > >>> stream
> > > > >>> > > before
> > > > >>> > > > writing it to S3, so I would have to include a separate
> join
> > > > >>> operator
> > > > >>> > for
> > > > >>> > > > the same two streams, right ?
> > > > >>> > > > Correct me if I am wrong.
> > > > >>> > > >
> > > > >>> > > > I have pasted the dummy code which collects the
> non-matching
> > > > >>> records (i
> > > > >>> > > > have to perform this on the actual data, correct me if I am
> > > dong
> > > > >>> > wrong).
> > > > >>> > > >
> > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > >>> > ElementSelector()).equalTo(new
> > > > >>> > > > ElementSelector())
> > > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > > >>> > > >
> > > > >>> > > > private static final long serialVersionUID =
> > > > 6408179761497497475L;
> > > > >>> > > >
> > > > >>> > > > @Override
> > > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > > >>> Iterable<Integer>
> > > > >>> > > > paramIterable1,
> > > > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > > > >>> > > > long exactSizeIfKnown =
> > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > >>> > > > long exactSizeIfKnown2 =
> > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > > > >>> > > > }
> > > > >>> > > > }
> > > > >>> > > > }).print();
> > > > >>> > > >
> > > > >>> > > > Regards,
> > > > >>> > > > Vinay Patil
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > >>> [hidden email]>
> > > > >>> > > > wrote:
> > > > >>> > > >
> > > > >>> > > > > You are right, debugged it for all elements , I can do
> that
> > > > now.
> > > > >>> > > > > Thanks a lot.
> > > > >>> > > > >
> > > > >>> > > > > Regards,
> > > > >>> > > > > Vinay Patil
> > > > >>> > > > >
> > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > >>> > [hidden email]>
> > > > >>> > > > > wrote:
> > > > >>> > > > >
> > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer>
> > > iter2,
> > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2
> are
> > > not
> > > > >>> > empty,
> > > > >>> > > it
> > > > >>> > > > >> means they are matched elements from both stream.
> > > > >>> > > > >> When one of iter1 and iter2 is empty , it means that
> they
> > > are
> > > > >>> > > unmatched.
> > > > >>> > > > >>
> > > > >>> > > > >>
> > > > >>> > > > >> - Jark Wu (wuchong)
> > > > >>> > > > >>
> > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > [hidden email]
> > > >
> > > > >>> 写道:
> > > > >>> > > > >> >
> > > > >>> > > > >> > Hi Matthias ,
> > > > >>> > > > >> >
> > > > >>> > > > >> > I did not get you, even if we use Co-Group we have to
> > > apply
> > > > >>> it on
> > > > >>> > a
> > > > >>> > > > key
> > > > >>> > > > >> >
> > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > >>> > > > >> > .where(new ElementSelector())
> > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > >>> > > > >> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> Integer>()
> > {
> > > > >>> > > > >> > private static final long serialVersionUID =
> > > > >>> 6408179761497497475L;
> > > > >>> > > > >> >
> > > > >>> > > > >> > @Override
> > > > >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > > > >>> > > Iterable<Integer>
> > > > >>> > > > >> > paramIterable1,
> > > > >>> > > > >> > Collector<Integer> paramCollector) throws Exception {
> > > > >>> > > > >> > Iterator<Integer> iterator = paramIterable.iterator();
> > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > >>> > > > >> > }
> > > > >>> > > > >> > }
> > > > >>> > > > >> > });
> > > > >>> > > > >> >
> > > > >>> > > > >> > when I debug this ,only the matched element from both
> > > stream
> > > > >>> will
> > > > >>> > > come
> > > > >>> > > > >> in
> > > > >>> > > > >> > the coGroup function.
> > > > >>> > > > >> >
> > > > >>> > > > >> > What I want is how do I check for unmatched elements
> > from
> > > > both
> > > > >>> > > streams
> > > > >>> > > > >> and
> > > > >>> > > > >> > write it to sink.
> > > > >>> > > > >> >
> > > > >>> > > > >> > Regards,
> > > > >>> > > > >> > Vinay Patil
> > > > >>> > > > >> >
> > > > >>> > > > >> > *+91-800-728-4749*
> > > > >>> > > > >> >
> > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > > > >>> > [hidden email]>
> > > > >>> > > > >> wrote:
> > > > >>> > > > >> >
> > > > >>> > > > >> >> You need to do an outer-join. However, there is no
> > > build-in
> > > > >>> > support
> > > > >>> > > > for
> > > > >>> > > > >> >> outer-joins yet.
> > > > >>> > > > >> >>
> > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> outer-join
> > as
> > > > an
> > > > >>> own
> > > > >>> > > > >> operator.
> > > > >>> > > > >> >>
> > > > >>> > > > >> >>
> > > > >>> > > > >> >> -Matthias
> > > > >>> > > > >> >>
> > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > >>> > > > >> >>> Hi,
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> I have a question regarding the join operation,
> > consider
> > > > the
> > > > >>> > > > following
> > > > >>> > > > >> >>> dummy example:
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > >>> > > > >> >>>
> StreamExecutionEnvironment.getExecutionEnvironment();
> > > > >>> > > > >> >>>
> > > > >>> >
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > >>> > > > >> >>>
> > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> Integer>() {
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> private static final long serialVersionUID = 1L;
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> @Override
> > > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer
> > paramIN2)
> > > > >>> throws
> > > > >>> > > > >> Exception
> > > > >>> > > > >> >> {
> > > > >>> > > > >> >>> return paramIN1;
> > > > >>> > > > >> >>> }
> > > > >>> > > > >> >>> }).print();
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> I perfectly get the elements that are matching in
> both
> > > the
> > > > >>> > > streams,
> > > > >>> > > > >> >> however
> > > > >>> > > > >> >>> my requirement is to write these matched elements
> and
> > > also
> > > > >>> the
> > > > >>> > > > >> unmatched
> > > > >>> > > > >> >>> elements to sink(S3)
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> How do I get the unmatched elements from each
> stream ?
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>> Regards,
> > > > >>> > > > >> >>> Vinay Patil
> > > > >>> > > > >> >>>
> > > > >>> > > > >> >>
> > > > >>> > > > >> >>
> > > > >>> > > > >>
> > > > >>> > > > >>
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Aljoscha Krettek-2
Hi,
the reason why the last element might never be emitted is the way the
ascending timestamp extractor works. I'll try and explain with an example.

Let's say we have a window size of 2 milliseconds, elements arrive starting
with timestamp 0, window begin timestamp is inclusive, end timestamp is
exclusive:

Element 0, Timestamp 0 (at this point the watermark is -1)
Element 1, Timestamp 1 (at this point the watermark is 0)
Element 2, Timestamp 1 (at this point the watermark is still 0)
Element 3, Timestamp 2 (at this point the watermark is 1)

now we can process the window (0, 2) because we know from the watermark
that no elements can arrive for that window anymore. The window contains
elements 0,1,2

Element 4, Timestamp 3 (at this point the watermark is 2)
Element 5, Timestamp 4 (at this point the watermark is 3)

now we can process window (2, 4). The window contains elements 3,4.

At this point, we have Element 5 sitting in internal buffers for window (4,
6) but if we don't receive further elements the watermark will never
advance and we will never process that window.

If, however, we get new elements at some point the watermark advances and
we don't have a problem. That's what I meant when I said that you shouldn't
have a problem if data keeps continuously arriving.

Cheers,
Aljoscha


On Tue, 28 Jun 2016 at 17:14 Vinay Patil <[hidden email]> wrote:

> Hi Aljoscha,
>
> Thanks a lot for your inputs.
>
> I still did not get you when you say you will not face this issue in case
> of continuous stream, lets consider the following example :
> Assume that the stream runs continuously from Monday  to Friday, and on
> Friday it stops after 5.00 PM , will I still face this issue ?
>
> I am actually not able to understand how it will differ in real time
> streams.
>
> Regards,
> Vinay Patil
>
> On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > ingestion time can only be used if you don't care about the timestamp in
> > the elements. So if you have those you should probably use event time.
> >
> > If your timestamps really are strictly increasing then the ascending
> > extractor is good. And if you have a continuous stream of incoming
> elements
> > you will not see the behavior of not getting the last elements.
> >
> > By the way, when using Kafka you can also embed the timestamp extractor
> > directly in the Kafka consumer. This is described here:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> >
> > Cheers,
> > Aljoscha
> >
> > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <[hidden email]>
> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for your response.
> > > So do you suggest to use different approach for extracting timestamp
> (as
> > > given in document) instead of AscendingTimeStamp Extractor ?
> > > Is that the reason I am seeing this unexpected behaviour ? in case of
> > > continuous stream I would not see any data loss ?
> > >
> > > Also assuming that the records are always going to be in order , which
> is
> > > the best approach : Ingestion Time or Event Time ?
> > >
> > >
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi,
> > > > first regarding tumbling windows: even if you have 5 minute windows
> it
> > > can
> > > > happen that elements that are only seconds apart go into different
> > > windows.
> > > > Consider the following case:
> > > >
> > > > |                x | x                 |
> > > >
> > > > These are two 5-mintue windows and the two elements are only seconds
> > > apart
> > > > but go into different windows because windows are aligned to epoch.
> > > >
> > > > Now, for the ascending timestamp extractor. The reason this can
> behave
> > in
> > > > unexpected ways is that it emits a watermark that is "last timestamp
> -
> > > 1",
> > > > i.e. if it has seen timestamp t it can only emit watermark t-1
> because
> > > > there might be other elements with timestamp t arriving. If you have
> a
> > > > continuous stream of elements you wouldn't notice this. Only in this
> > > > constructed example does it become visible.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]>
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Following is the timestamp I am getting from DTO, here is the
> > timestamp
> > > > > difference between the two records :
> > > > > 1466115892162154279
> > > > > 1466116026233613409
> > > > >
> > > > > So the time difference is roughly 3 min, even if I apply the window
> > of
> > > > 5min
> > > > > , I am not getting the last record (last timestamp value above),
> > > > > using ascending timestamp extractor for generating the timestamp
> > > > (assuming
> > > > > that the timestamp are always in order)
> > > > >
> > > > > I was at-least expecting data to reach the co-group function.
> > > > > What could be the reason for the data loss ? The data we are
> getting
> > is
> > > > > critical, hence we cannot afford to loose any data
> > > > >
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > [hidden email]
> > > >
> > > > > wrote:
> > > > >
> > > > > > Just an update, when I keep IngestionTime and remove the
> timestamp
> > I
> > > am
> > > > > > generating, I am getting all the records, but for Event Time I am
> > > > getting
> > > > > > one less record, I checked the Time Difference between two
> records,
> > > it
> > > > > is 3
> > > > > > min, I tried keeping the window time to 5 mins, but that even did
> > not
> > > > > work.
> > > > > >
> > > > > > Even when I try assigning timestamp for IngestionTime, I get one
> > > record
> > > > > > less, so should I safely use Ingestion Time or is it always
> > advisable
> > > > to
> > > > > > use EventTime ?
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > [hidden email]>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi ,
> > > > > >>
> > > > > >> Actually I am only publishing 5 messages each to two different
> > kafka
> > > > > >> topics (using Junit), even if I keep the window to 500 seconds
> the
> > > > > result
> > > > > >> is same.
> > > > > >>
> > > > > >> I am not understanding why it is not sending the 5th element to
> > > > co-group
> > > > > >> operator even when the keys are same.
> > > > > >>
> > > > > >> I actually cannot share the actual client code.
> > > > > >> But this is what the streams look like :
> > > > > >> sourceStream.coGroup(destStream)
> > > > > >> here the sourceStream and destStream is actually
> > Tuple2<String,DTO>
> > > ,
> > > > > and
> > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > >>
> > > > > >> I am generating the timestamp based on a field from the DTO
> which
> > is
> > > > > >> guaranteed to be in order.
> > > > > >>
> > > > > >> Will using the triggers help here ?
> > > > > >>
> > > > > >>
> > > > > >> Regards,
> > > > > >> Vinay Patil
> > > > > >>
> > > > > >> *+91-800-728-4749*
> > > > > >>
> > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > [hidden email]>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Hi,
> > > > > >>> what timestamps are you assigning? Is it guaranteed that all of
> > > them
> > > > > >>> would
> > > > > >>> fall into the same 30 second window?
> > > > > >>>
> > > > > >>> The issue with duplicate printing in the ElementSelector is
> > > strange?
> > > > > >>> Could
> > > > > >>> you post a more complete code example so that I can reproduce
> the
> > > > > >>> problem?
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Aljoscha
> > > > > >>>
> > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > [hidden email]>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>> > Hi ,
> > > > > >>> >
> > > > > >>> > I am able to get the matching and non-matching elements.
> > > > > >>> >
> > > > > >>> > However when I am unit testing the code , I am getting one
> > record
> > > > > less
> > > > > >>> > inside the overriden cogroup function.
> > > > > >>> > Testing the following way :
> > > > > >>> >
> > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > >>> > 2) Insert different 5 messages into local kafka topic (test2)
> > > > > >>> > 3) Consume 1) and 2) and I have two different kafka  streams
> > > > > >>> > 4) Generate ascending timestamp(using Event Time) for both
> > > streams
> > > > > and
> > > > > >>> > create key(String)
> > > > > >>> >
> > > > > >>> > Now till 4) I am able to get all the records (checked by
> > printing
> > > > the
> > > > > >>> > stream in text file)
> > > > > >>> >
> > > > > >>> > However when I send the stream to co-group operator, I am
> > > receiving
> > > > > one
> > > > > >>> > less record, using the following syntax:
> > > > > >>> >
> > > > > >>> > sourceStream.coGroup(destStream)
> > > > > >>> > .where(new ElementSelector())
> > > > > >>> > .equalTo(new ElementSelector())
> > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > >>> > .apply(new JoinStreams);
> > > > > >>> >
> > > > > >>> > Also in the Element Selector I have inserted a sysout, I am
> > > getting
> > > > > 20
> > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for dest
> > > > stream)
> > > > > >>> >
> > > > > >>> > Unable to understand why one record is coming less to
> co-group
> > > > > >>> >
> > > > > >>> >
> > > > > >>> >
> > > > > >>> > Regards,
> > > > > >>> > Vinay Patil
> > > > > >>> >
> > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > [hidden email]>
> > > > > >>> wrote:
> > > > > >>> >
> > > > > >>> > > Can you add a flag to each element emitted by the
> > > CoGroupFunction
> > > > > >>> that
> > > > > >>> > > indicates whether it was joined or not?
> > > > > >>> > > Then you can use split to distinguish between both cases
> and
> > > > handle
> > > > > >>> both
> > > > > >>> > > streams differently.
> > > > > >>> > >
> > > > > >>> > > Best, Fabian
> > > > > >>> > >
> > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > [hidden email]
> > > >:
> > > > > >>> > >
> > > > > >>> > > > Hi Jark,
> > > > > >>> > > >
> > > > > >>> > > > I am able to get the non-matching elements in a stream :,
> > > > > >>> > > >
> > > > > >>> > > > Of-course we can collect the matching elements in the
> same
> > > > stream
> > > > > >>> as
> > > > > >>> > > well,
> > > > > >>> > > > however I want to perform additional operations on the
> > joined
> > > > > >>> stream
> > > > > >>> > > before
> > > > > >>> > > > writing it to S3, so I would have to include a separate
> > join
> > > > > >>> operator
> > > > > >>> > for
> > > > > >>> > > > the same two streams, right ?
> > > > > >>> > > > Correct me if I am wrong.
> > > > > >>> > > >
> > > > > >>> > > > I have pasted the dummy code which collects the
> > non-matching
> > > > > >>> records (i
> > > > > >>> > > > have to perform this on the actual data, correct me if I
> am
> > > > dong
> > > > > >>> > wrong).
> > > > > >>> > > >
> > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > >>> > ElementSelector()).equalTo(new
> > > > > >>> > > > ElementSelector())
> > > > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer, Integer>() {
> > > > > >>> > > >
> > > > > >>> > > > private static final long serialVersionUID =
> > > > > 6408179761497497475L;
> > > > > >>> > > >
> > > > > >>> > > > @Override
> > > > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > > > >>> Iterable<Integer>
> > > > > >>> > > > paramIterable1,
> > > > > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > > > > >>> > > > long exactSizeIfKnown =
> > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > >>> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > >>> > > > paramCollector.collect(paramIterable.iterator().next());
> > > > > >>> > > > }
> > > > > >>> > > > }
> > > > > >>> > > > }).print();
> > > > > >>> > > >
> > > > > >>> > > > Regards,
> > > > > >>> > > > Vinay Patil
> > > > > >>> > > >
> > > > > >>> > > >
> > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > >>> [hidden email]>
> > > > > >>> > > > wrote:
> > > > > >>> > > >
> > > > > >>> > > > > You are right, debugged it for all elements , I can do
> > that
> > > > > now.
> > > > > >>> > > > > Thanks a lot.
> > > > > >>> > > > >
> > > > > >>> > > > > Regards,
> > > > > >>> > > > > Vinay Patil
> > > > > >>> > > > >
> > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > >>> > [hidden email]>
> > > > > >>> > > > > wrote:
> > > > > >>> > > > >
> > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1, Iterable<Integer>
> > > > iter2,
> > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and iter2
> > are
> > > > not
> > > > > >>> > empty,
> > > > > >>> > > it
> > > > > >>> > > > >> means they are matched elements from both stream.
> > > > > >>> > > > >> When one of iter1 and iter2 is empty , it means that
> > they
> > > > are
> > > > > >>> > > unmatched.
> > > > > >>> > > > >>
> > > > > >>> > > > >>
> > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > >>> > > > >>
> > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > [hidden email]
> > > > >
> > > > > >>> 写道:
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > Hi Matthias ,
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > I did not get you, even if we use Co-Group we have
> to
> > > > apply
> > > > > >>> it on
> > > > > >>> > a
> > > > > >>> > > > key
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > >>> > > > >> >
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> > Integer>()
> > > {
> > > > > >>> > > > >> > private static final long serialVersionUID =
> > > > > >>> 6408179761497497475L;
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > @Override
> > > > > >>> > > > >> > public void coGroup(Iterable<Integer> paramIterable,
> > > > > >>> > > Iterable<Integer>
> > > > > >>> > > > >> > paramIterable1,
> > > > > >>> > > > >> > Collector<Integer> paramCollector) throws Exception
> {
> > > > > >>> > > > >> > Iterator<Integer> iterator =
> paramIterable.iterator();
> > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > >>> > > > >> > }
> > > > > >>> > > > >> > }
> > > > > >>> > > > >> > });
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > when I debug this ,only the matched element from
> both
> > > > stream
> > > > > >>> will
> > > > > >>> > > come
> > > > > >>> > > > >> in
> > > > > >>> > > > >> > the coGroup function.
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > What I want is how do I check for unmatched elements
> > > from
> > > > > both
> > > > > >>> > > streams
> > > > > >>> > > > >> and
> > > > > >>> > > > >> > write it to sink.
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > Regards,
> > > > > >>> > > > >> > Vinay Patil
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > >>> > > > >> >
> > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > > > > >>> > [hidden email]>
> > > > > >>> > > > >> wrote:
> > > > > >>> > > > >> >
> > > > > >>> > > > >> >> You need to do an outer-join. However, there is no
> > > > build-in
> > > > > >>> > support
> > > > > >>> > > > for
> > > > > >>> > > > >> >> outer-joins yet.
> > > > > >>> > > > >> >>
> > > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> > outer-join
> > > as
> > > > > an
> > > > > >>> own
> > > > > >>> > > > >> operator.
> > > > > >>> > > > >> >>
> > > > > >>> > > > >> >>
> > > > > >>> > > > >> >> -Matthias
> > > > > >>> > > > >> >>
> > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > > >>> > > > >> >>> Hi,
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> I have a question regarding the join operation,
> > > consider
> > > > > the
> > > > > >>> > > > following
> > > > > >>> > > > >> >>> dummy example:
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > >>> > > > >> >>>
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > >>> > > > >> >>>
> > > > > >>> >
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > >>> > > > >> >>>
> > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> > Integer>() {
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> private static final long serialVersionUID = 1L;
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> @Override
> > > > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer
> > > paramIN2)
> > > > > >>> throws
> > > > > >>> > > > >> Exception
> > > > > >>> > > > >> >> {
> > > > > >>> > > > >> >>> return paramIN1;
> > > > > >>> > > > >> >>> }
> > > > > >>> > > > >> >>> }).print();
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> I perfectly get the elements that are matching in
> > both
> > > > the
> > > > > >>> > > streams,
> > > > > >>> > > > >> >> however
> > > > > >>> > > > >> >>> my requirement is to write these matched elements
> > and
> > > > also
> > > > > >>> the
> > > > > >>> > > > >> unmatched
> > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> How do I get the unmatched elements from each
> > stream ?
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>> Regards,
> > > > > >>> > > > >> >>> Vinay Patil
> > > > > >>> > > > >> >>>
> > > > > >>> > > > >> >>
> > > > > >>> > > > >> >>
> > > > > >>> > > > >>
> > > > > >>> > > > >>
> > > > > >>> > > > >
> > > > > >>> > > >
> > > > > >>> > >
> > > > > >>> >
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Vinay Patil
Hi Aljoscha,

This clears a lot of doubts now.
So now lets say the stream paused for a while or it stops completely on
Friday , let us assume that the last message did not get processed and is
kept in the internal buffers.

So when the stream starts again on Monday , will it consider the last
element that is in the internal buffer for processing ?
 How much time the internal buffer can hold the data or will it flush the
data after a threshold ?

I have tried using AssignerWithPunctuatedWatermarks and generated the
watermark for each event, still getting one record less.


Regards,
Vinay Patil

On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> the reason why the last element might never be emitted is the way the
> ascending timestamp extractor works. I'll try and explain with an example.
>
> Let's say we have a window size of 2 milliseconds, elements arrive starting
> with timestamp 0, window begin timestamp is inclusive, end timestamp is
> exclusive:
>
> Element 0, Timestamp 0 (at this point the watermark is -1)
> Element 1, Timestamp 1 (at this point the watermark is 0)
> Element 2, Timestamp 1 (at this point the watermark is still 0)
> Element 3, Timestamp 2 (at this point the watermark is 1)
>
> now we can process the window (0, 2) because we know from the watermark
> that no elements can arrive for that window anymore. The window contains
> elements 0,1,2
>
> Element 4, Timestamp 3 (at this point the watermark is 2)
> Element 5, Timestamp 4 (at this point the watermark is 3)
>
> now we can process window (2, 4). The window contains elements 3,4.
>
> At this point, we have Element 5 sitting in internal buffers for window (4,
> 6) but if we don't receive further elements the watermark will never
> advance and we will never process that window.
>
> If, however, we get new elements at some point the watermark advances and
> we don't have a problem. That's what I meant when I said that you shouldn't
> have a problem if data keeps continuously arriving.
>
> Cheers,
> Aljoscha
>
>
> On Tue, 28 Jun 2016 at 17:14 Vinay Patil <[hidden email]> wrote:
>
> > Hi Aljoscha,
> >
> > Thanks a lot for your inputs.
> >
> > I still did not get you when you say you will not face this issue in case
> > of continuous stream, lets consider the following example :
> > Assume that the stream runs continuously from Monday  to Friday, and on
> > Friday it stops after 5.00 PM , will I still face this issue ?
> >
> > I am actually not able to understand how it will differ in real time
> > streams.
> >
> > Regards,
> > Vinay Patil
> >
> > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > Hi,
> > > ingestion time can only be used if you don't care about the timestamp
> in
> > > the elements. So if you have those you should probably use event time.
> > >
> > > If your timestamps really are strictly increasing then the ascending
> > > extractor is good. And if you have a continuous stream of incoming
> > elements
> > > you will not see the behavior of not getting the last elements.
> > >
> > > By the way, when using Kafka you can also embed the timestamp extractor
> > > directly in the Kafka consumer. This is described here:
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <[hidden email]>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for your response.
> > > > So do you suggest to use different approach for extracting timestamp
> > (as
> > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > Is that the reason I am seeing this unexpected behaviour ? in case of
> > > > continuous stream I would not see any data loss ?
> > > >
> > > > Also assuming that the records are always going to be in order ,
> which
> > is
> > > > the best approach : Ingestion Time or Event Time ?
> > > >
> > > >
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> [hidden email]
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > first regarding tumbling windows: even if you have 5 minute windows
> > it
> > > > can
> > > > > happen that elements that are only seconds apart go into different
> > > > windows.
> > > > > Consider the following case:
> > > > >
> > > > > |                x | x                 |
> > > > >
> > > > > These are two 5-mintue windows and the two elements are only
> seconds
> > > > apart
> > > > > but go into different windows because windows are aligned to epoch.
> > > > >
> > > > > Now, for the ascending timestamp extractor. The reason this can
> > behave
> > > in
> > > > > unexpected ways is that it emits a watermark that is "last
> timestamp
> > -
> > > > 1",
> > > > > i.e. if it has seen timestamp t it can only emit watermark t-1
> > because
> > > > > there might be other elements with timestamp t arriving. If you
> have
> > a
> > > > > continuous stream of elements you wouldn't notice this. Only in
> this
> > > > > constructed example does it become visible.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <[hidden email]>
> > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Following is the timestamp I am getting from DTO, here is the
> > > timestamp
> > > > > > difference between the two records :
> > > > > > 1466115892162154279
> > > > > > 1466116026233613409
> > > > > >
> > > > > > So the time difference is roughly 3 min, even if I apply the
> window
> > > of
> > > > > 5min
> > > > > > , I am not getting the last record (last timestamp value above),
> > > > > > using ascending timestamp extractor for generating the timestamp
> > > > > (assuming
> > > > > > that the timestamp are always in order)
> > > > > >
> > > > > > I was at-least expecting data to reach the co-group function.
> > > > > > What could be the reason for the data loss ? The data we are
> > getting
> > > is
> > > > > > critical, hence we cannot afford to loose any data
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > [hidden email]
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Just an update, when I keep IngestionTime and remove the
> > timestamp
> > > I
> > > > am
> > > > > > > generating, I am getting all the records, but for Event Time I
> am
> > > > > getting
> > > > > > > one less record, I checked the Time Difference between two
> > records,
> > > > it
> > > > > > is 3
> > > > > > > min, I tried keeping the window time to 5 mins, but that even
> did
> > > not
> > > > > > work.
> > > > > > >
> > > > > > > Even when I try assigning timestamp for IngestionTime, I get
> one
> > > > record
> > > > > > > less, so should I safely use Ingestion Time or is it always
> > > advisable
> > > > > to
> > > > > > > use EventTime ?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > [hidden email]>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi ,
> > > > > > >>
> > > > > > >> Actually I am only publishing 5 messages each to two different
> > > kafka
> > > > > > >> topics (using Junit), even if I keep the window to 500 seconds
> > the
> > > > > > result
> > > > > > >> is same.
> > > > > > >>
> > > > > > >> I am not understanding why it is not sending the 5th element
> to
> > > > > co-group
> > > > > > >> operator even when the keys are same.
> > > > > > >>
> > > > > > >> I actually cannot share the actual client code.
> > > > > > >> But this is what the streams look like :
> > > > > > >> sourceStream.coGroup(destStream)
> > > > > > >> here the sourceStream and destStream is actually
> > > Tuple2<String,DTO>
> > > > ,
> > > > > > and
> > > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > > >>
> > > > > > >> I am generating the timestamp based on a field from the DTO
> > which
> > > is
> > > > > > >> guaranteed to be in order.
> > > > > > >>
> > > > > > >> Will using the triggers help here ?
> > > > > > >>
> > > > > > >>
> > > > > > >> Regards,
> > > > > > >> Vinay Patil
> > > > > > >>
> > > > > > >> *+91-800-728-4749*
> > > > > > >>
> > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > [hidden email]>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi,
> > > > > > >>> what timestamps are you assigning? Is it guaranteed that all
> of
> > > > them
> > > > > > >>> would
> > > > > > >>> fall into the same 30 second window?
> > > > > > >>>
> > > > > > >>> The issue with duplicate printing in the ElementSelector is
> > > > strange?
> > > > > > >>> Could
> > > > > > >>> you post a more complete code example so that I can reproduce
> > the
> > > > > > >>> problem?
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Aljoscha
> > > > > > >>>
> > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > [hidden email]>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>> > Hi ,
> > > > > > >>> >
> > > > > > >>> > I am able to get the matching and non-matching elements.
> > > > > > >>> >
> > > > > > >>> > However when I am unit testing the code , I am getting one
> > > record
> > > > > > less
> > > > > > >>> > inside the overriden cogroup function.
> > > > > > >>> > Testing the following way :
> > > > > > >>> >
> > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > > >>> > 2) Insert different 5 messages into local kafka topic
> (test2)
> > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka
> streams
> > > > > > >>> > 4) Generate ascending timestamp(using Event Time) for both
> > > > streams
> > > > > > and
> > > > > > >>> > create key(String)
> > > > > > >>> >
> > > > > > >>> > Now till 4) I am able to get all the records (checked by
> > > printing
> > > > > the
> > > > > > >>> > stream in text file)
> > > > > > >>> >
> > > > > > >>> > However when I send the stream to co-group operator, I am
> > > > receiving
> > > > > > one
> > > > > > >>> > less record, using the following syntax:
> > > > > > >>> >
> > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > >>> > .where(new ElementSelector())
> > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > >>> > .apply(new JoinStreams);
> > > > > > >>> >
> > > > > > >>> > Also in the Element Selector I have inserted a sysout, I am
> > > > getting
> > > > > > 20
> > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for
> dest
> > > > > stream)
> > > > > > >>> >
> > > > > > >>> > Unable to understand why one record is coming less to
> > co-group
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> >
> > > > > > >>> > Regards,
> > > > > > >>> > Vinay Patil
> > > > > > >>> >
> > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > [hidden email]>
> > > > > > >>> wrote:
> > > > > > >>> >
> > > > > > >>> > > Can you add a flag to each element emitted by the
> > > > CoGroupFunction
> > > > > > >>> that
> > > > > > >>> > > indicates whether it was joined or not?
> > > > > > >>> > > Then you can use split to distinguish between both cases
> > and
> > > > > handle
> > > > > > >>> both
> > > > > > >>> > > streams differently.
> > > > > > >>> > >
> > > > > > >>> > > Best, Fabian
> > > > > > >>> > >
> > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > [hidden email]
> > > > >:
> > > > > > >>> > >
> > > > > > >>> > > > Hi Jark,
> > > > > > >>> > > >
> > > > > > >>> > > > I am able to get the non-matching elements in a stream
> :,
> > > > > > >>> > > >
> > > > > > >>> > > > Of-course we can collect the matching elements in the
> > same
> > > > > stream
> > > > > > >>> as
> > > > > > >>> > > well,
> > > > > > >>> > > > however I want to perform additional operations on the
> > > joined
> > > > > > >>> stream
> > > > > > >>> > > before
> > > > > > >>> > > > writing it to S3, so I would have to include a separate
> > > join
> > > > > > >>> operator
> > > > > > >>> > for
> > > > > > >>> > > > the same two streams, right ?
> > > > > > >>> > > > Correct me if I am wrong.
> > > > > > >>> > > >
> > > > > > >>> > > > I have pasted the dummy code which collects the
> > > non-matching
> > > > > > >>> records (i
> > > > > > >>> > > > have to perform this on the actual data, correct me if
> I
> > am
> > > > > dong
> > > > > > >>> > wrong).
> > > > > > >>> > > >
> > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > >>> > > > ElementSelector())
> > > > > > >>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> Integer>() {
> > > > > > >>> > > >
> > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > 6408179761497497475L;
> > > > > > >>> > > >
> > > > > > >>> > > > @Override
> > > > > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > > > > >>> Iterable<Integer>
> > > > > > >>> > > > paramIterable1,
> > > > > > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > >>> > > >
> paramCollector.collect(paramIterable1.iterator().next());
> > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > >>> > > >
> paramCollector.collect(paramIterable.iterator().next());
> > > > > > >>> > > > }
> > > > > > >>> > > > }
> > > > > > >>> > > > }).print();
> > > > > > >>> > > >
> > > > > > >>> > > > Regards,
> > > > > > >>> > > > Vinay Patil
> > > > > > >>> > > >
> > > > > > >>> > > >
> > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > > >>> [hidden email]>
> > > > > > >>> > > > wrote:
> > > > > > >>> > > >
> > > > > > >>> > > > > You are right, debugged it for all elements , I can
> do
> > > that
> > > > > > now.
> > > > > > >>> > > > > Thanks a lot.
> > > > > > >>> > > > >
> > > > > > >>> > > > > Regards,
> > > > > > >>> > > > > Vinay Patil
> > > > > > >>> > > > >
> > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > >>> > [hidden email]>
> > > > > > >>> > > > > wrote:
> > > > > > >>> > > > >
> > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> Iterable<Integer>
> > > > > iter2,
> > > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and
> iter2
> > > are
> > > > > not
> > > > > > >>> > empty,
> > > > > > >>> > > it
> > > > > > >>> > > > >> means they are matched elements from both stream.
> > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it means that
> > > they
> > > > > are
> > > > > > >>> > > unmatched.
> > > > > > >>> > > > >>
> > > > > > >>> > > > >>
> > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > >>> > > > >>
> > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > [hidden email]
> > > > > >
> > > > > > >>> 写道:
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > I did not get you, even if we use Co-Group we have
> > to
> > > > > apply
> > > > > > >>> it on
> > > > > > >>> > a
> > > > > > >>> > > > key
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > >>> > > > >> >
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> > > Integer>()
> > > > {
> > > > > > >>> > > > >> > private static final long serialVersionUID =
> > > > > > >>> 6408179761497497475L;
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > @Override
> > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> paramIterable,
> > > > > > >>> > > Iterable<Integer>
> > > > > > >>> > > > >> > paramIterable1,
> > > > > > >>> > > > >> > Collector<Integer> paramCollector) throws
> Exception
> > {
> > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > paramIterable.iterator();
> > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > >>> > > > >> > }
> > > > > > >>> > > > >> > }
> > > > > > >>> > > > >> > });
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > when I debug this ,only the matched element from
> > both
> > > > > stream
> > > > > > >>> will
> > > > > > >>> > > come
> > > > > > >>> > > > >> in
> > > > > > >>> > > > >> > the coGroup function.
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > What I want is how do I check for unmatched
> elements
> > > > from
> > > > > > both
> > > > > > >>> > > streams
> > > > > > >>> > > > >> and
> > > > > > >>> > > > >> > write it to sink.
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > Regards,
> > > > > > >>> > > > >> > Vinay Patil
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J. Sax <
> > > > > > >>> > [hidden email]>
> > > > > > >>> > > > >> wrote:
> > > > > > >>> > > > >> >
> > > > > > >>> > > > >> >> You need to do an outer-join. However, there is
> no
> > > > > build-in
> > > > > > >>> > support
> > > > > > >>> > > > for
> > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> > > outer-join
> > > > as
> > > > > > an
> > > > > > >>> own
> > > > > > >>> > > > >> operator.
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >> >> -Matthias
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > > > >>> > > > >> >>> Hi,
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> I have a question regarding the join operation,
> > > > consider
> > > > > > the
> > > > > > >>> > > > following
> > > > > > >>> > > > >> >>> dummy example:
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > >>> > > > >> >>>
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > >>> > > > >> >>>
> > > > > > >>> >
> > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > >>> > > > >> >>>
> > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> > > Integer>() {
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> private static final long serialVersionUID = 1L;
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> @Override
> > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer
> > > > paramIN2)
> > > > > > >>> throws
> > > > > > >>> > > > >> Exception
> > > > > > >>> > > > >> >> {
> > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > >>> > > > >> >>> }
> > > > > > >>> > > > >> >>> }).print();
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> I perfectly get the elements that are matching
> in
> > > both
> > > > > the
> > > > > > >>> > > streams,
> > > > > > >>> > > > >> >> however
> > > > > > >>> > > > >> >>> my requirement is to write these matched
> elements
> > > and
> > > > > also
> > > > > > >>> the
> > > > > > >>> > > > >> unmatched
> > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> How do I get the unmatched elements from each
> > > stream ?
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>> Regards,
> > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > >>> > > > >> >>>
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >> >>
> > > > > > >>> > > > >>
> > > > > > >>> > > > >>
> > > > > > >>> > > > >
> > > > > > >>> > > >
> > > > > > >>> > >
> > > > > > >>> >
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [Discussion] Query regarding Join

Aljoscha Krettek-2
Hi,
the element will be kept around indefinitely if no new watermark arrives.

I think the same problem will persist for AssignerWithPunctuatedWatermarks
since there you also might not get the required "last watermark" to trigger
processing of the last window.

Cheers,
Aljoscha

On Wed, 29 Jun 2016 at 13:18 Vinay Patil <[hidden email]> wrote:

> Hi Aljoscha,
>
> This clears a lot of doubts now.
> So now lets say the stream paused for a while or it stops completely on
> Friday , let us assume that the last message did not get processed and is
> kept in the internal buffers.
>
> So when the stream starts again on Monday , will it consider the last
> element that is in the internal buffer for processing ?
>  How much time the internal buffer can hold the data or will it flush the
> data after a threshold ?
>
> I have tried using AssignerWithPunctuatedWatermarks and generated the
> watermark for each event, still getting one record less.
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > Hi,
> > the reason why the last element might never be emitted is the way the
> > ascending timestamp extractor works. I'll try and explain with an
> example.
> >
> > Let's say we have a window size of 2 milliseconds, elements arrive
> starting
> > with timestamp 0, window begin timestamp is inclusive, end timestamp is
> > exclusive:
> >
> > Element 0, Timestamp 0 (at this point the watermark is -1)
> > Element 1, Timestamp 1 (at this point the watermark is 0)
> > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > Element 3, Timestamp 2 (at this point the watermark is 1)
> >
> > now we can process the window (0, 2) because we know from the watermark
> > that no elements can arrive for that window anymore. The window contains
> > elements 0,1,2
> >
> > Element 4, Timestamp 3 (at this point the watermark is 2)
> > Element 5, Timestamp 4 (at this point the watermark is 3)
> >
> > now we can process window (2, 4). The window contains elements 3,4.
> >
> > At this point, we have Element 5 sitting in internal buffers for window
> (4,
> > 6) but if we don't receive further elements the watermark will never
> > advance and we will never process that window.
> >
> > If, however, we get new elements at some point the watermark advances and
> > we don't have a problem. That's what I meant when I said that you
> shouldn't
> > have a problem if data keeps continuously arriving.
> >
> > Cheers,
> > Aljoscha
> >
> >
> > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <[hidden email]>
> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thanks a lot for your inputs.
> > >
> > > I still did not get you when you say you will not face this issue in
> case
> > > of continuous stream, lets consider the following example :
> > > Assume that the stream runs continuously from Monday  to Friday, and on
> > > Friday it stops after 5.00 PM , will I still face this issue ?
> > >
> > > I am actually not able to understand how it will differ in real time
> > > streams.
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > Hi,
> > > > ingestion time can only be used if you don't care about the timestamp
> > in
> > > > the elements. So if you have those you should probably use event
> time.
> > > >
> > > > If your timestamps really are strictly increasing then the ascending
> > > > extractor is good. And if you have a continuous stream of incoming
> > > elements
> > > > you will not see the behavior of not getting the last elements.
> > > >
> > > > By the way, when using Kafka you can also embed the timestamp
> extractor
> > > > directly in the Kafka consumer. This is described here:
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <[hidden email]>
> > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > Thank you for your response.
> > > > > So do you suggest to use different approach for extracting
> timestamp
> > > (as
> > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > Is that the reason I am seeing this unexpected behaviour ? in case
> of
> > > > > continuous stream I would not see any data loss ?
> > > > >
> > > > > Also assuming that the records are always going to be in order ,
> > which
> > > is
> > > > > the best approach : Ingestion Time or Event Time ?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > first regarding tumbling windows: even if you have 5 minute
> windows
> > > it
> > > > > can
> > > > > > happen that elements that are only seconds apart go into
> different
> > > > > windows.
> > > > > > Consider the following case:
> > > > > >
> > > > > > |                x | x                 |
> > > > > >
> > > > > > These are two 5-mintue windows and the two elements are only
> > seconds
> > > > > apart
> > > > > > but go into different windows because windows are aligned to
> epoch.
> > > > > >
> > > > > > Now, for the ascending timestamp extractor. The reason this can
> > > behave
> > > > in
> > > > > > unexpected ways is that it emits a watermark that is "last
> > timestamp
> > > -
> > > > > 1",
> > > > > > i.e. if it has seen timestamp t it can only emit watermark t-1
> > > because
> > > > > > there might be other elements with timestamp t arriving. If you
> > have
> > > a
> > > > > > continuous stream of elements you wouldn't notice this. Only in
> > this
> > > > > > constructed example does it become visible.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Following is the timestamp I am getting from DTO, here is the
> > > > timestamp
> > > > > > > difference between the two records :
> > > > > > > 1466115892162154279
> > > > > > > 1466116026233613409
> > > > > > >
> > > > > > > So the time difference is roughly 3 min, even if I apply the
> > window
> > > > of
> > > > > > 5min
> > > > > > > , I am not getting the last record (last timestamp value
> above),
> > > > > > > using ascending timestamp extractor for generating the
> timestamp
> > > > > > (assuming
> > > > > > > that the timestamp are always in order)
> > > > > > >
> > > > > > > I was at-least expecting data to reach the co-group function.
> > > > > > > What could be the reason for the data loss ? The data we are
> > > getting
> > > > is
> > > > > > > critical, hence we cannot afford to loose any data
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > [hidden email]
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Just an update, when I keep IngestionTime and remove the
> > > timestamp
> > > > I
> > > > > am
> > > > > > > > generating, I am getting all the records, but for Event Time
> I
> > am
> > > > > > getting
> > > > > > > > one less record, I checked the Time Difference between two
> > > records,
> > > > > it
> > > > > > > is 3
> > > > > > > > min, I tried keeping the window time to 5 mins, but that even
> > did
> > > > not
> > > > > > > work.
> > > > > > > >
> > > > > > > > Even when I try assigning timestamp for IngestionTime, I get
> > one
> > > > > record
> > > > > > > > less, so should I safely use Ingestion Time or is it always
> > > > advisable
> > > > > > to
> > > > > > > > use EventTime ?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > [hidden email]>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi ,
> > > > > > > >>
> > > > > > > >> Actually I am only publishing 5 messages each to two
> different
> > > > kafka
> > > > > > > >> topics (using Junit), even if I keep the window to 500
> seconds
> > > the
> > > > > > > result
> > > > > > > >> is same.
> > > > > > > >>
> > > > > > > >> I am not understanding why it is not sending the 5th element
> > to
> > > > > > co-group
> > > > > > > >> operator even when the keys are same.
> > > > > > > >>
> > > > > > > >> I actually cannot share the actual client code.
> > > > > > > >> But this is what the streams look like :
> > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > >> here the sourceStream and destStream is actually
> > > > Tuple2<String,DTO>
> > > > > ,
> > > > > > > and
> > > > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > > > >>
> > > > > > > >> I am generating the timestamp based on a field from the DTO
> > > which
> > > > is
> > > > > > > >> guaranteed to be in order.
> > > > > > > >>
> > > > > > > >> Will using the triggers help here ?
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Regards,
> > > > > > > >> Vinay Patil
> > > > > > > >>
> > > > > > > >> *+91-800-728-4749*
> > > > > > > >>
> > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > > [hidden email]>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Hi,
> > > > > > > >>> what timestamps are you assigning? Is it guaranteed that
> all
> > of
> > > > > them
> > > > > > > >>> would
> > > > > > > >>> fall into the same 30 second window?
> > > > > > > >>>
> > > > > > > >>> The issue with duplicate printing in the ElementSelector is
> > > > > strange?
> > > > > > > >>> Could
> > > > > > > >>> you post a more complete code example so that I can
> reproduce
> > > the
> > > > > > > >>> problem?
> > > > > > > >>>
> > > > > > > >>> Cheers,
> > > > > > > >>> Aljoscha
> > > > > > > >>>
> > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > [hidden email]>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> > Hi ,
> > > > > > > >>> >
> > > > > > > >>> > I am able to get the matching and non-matching elements.
> > > > > > > >>> >
> > > > > > > >>> > However when I am unit testing the code , I am getting
> one
> > > > record
> > > > > > > less
> > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > >>> > Testing the following way :
> > > > > > > >>> >
> > > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > > > >>> > 2) Insert different 5 messages into local kafka topic
> > (test2)
> > > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka
> > streams
> > > > > > > >>> > 4) Generate ascending timestamp(using Event Time) for
> both
> > > > > streams
> > > > > > > and
> > > > > > > >>> > create key(String)
> > > > > > > >>> >
> > > > > > > >>> > Now till 4) I am able to get all the records (checked by
> > > > printing
> > > > > > the
> > > > > > > >>> > stream in text file)
> > > > > > > >>> >
> > > > > > > >>> > However when I send the stream to co-group operator, I am
> > > > > receiving
> > > > > > > one
> > > > > > > >>> > less record, using the following syntax:
> > > > > > > >>> >
> > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > >>> > .where(new ElementSelector())
> > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > >>> >
> > > > > > > >>> > Also in the Element Selector I have inserted a sysout, I
> am
> > > > > getting
> > > > > > > 20
> > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for
> > dest
> > > > > > stream)
> > > > > > > >>> >
> > > > > > > >>> > Unable to understand why one record is coming less to
> > > co-group
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> >
> > > > > > > >>> > Regards,
> > > > > > > >>> > Vinay Patil
> > > > > > > >>> >
> > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > > [hidden email]>
> > > > > > > >>> wrote:
> > > > > > > >>> >
> > > > > > > >>> > > Can you add a flag to each element emitted by the
> > > > > CoGroupFunction
> > > > > > > >>> that
> > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > >>> > > Then you can use split to distinguish between both
> cases
> > > and
> > > > > > handle
> > > > > > > >>> both
> > > > > > > >>> > > streams differently.
> > > > > > > >>> > >
> > > > > > > >>> > > Best, Fabian
> > > > > > > >>> > >
> > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > [hidden email]
> > > > > >:
> > > > > > > >>> > >
> > > > > > > >>> > > > Hi Jark,
> > > > > > > >>> > > >
> > > > > > > >>> > > > I am able to get the non-matching elements in a
> stream
> > :,
> > > > > > > >>> > > >
> > > > > > > >>> > > > Of-course we can collect the matching elements in the
> > > same
> > > > > > stream
> > > > > > > >>> as
> > > > > > > >>> > > well,
> > > > > > > >>> > > > however I want to perform additional operations on
> the
> > > > joined
> > > > > > > >>> stream
> > > > > > > >>> > > before
> > > > > > > >>> > > > writing it to S3, so I would have to include a
> separate
> > > > join
> > > > > > > >>> operator
> > > > > > > >>> > for
> > > > > > > >>> > > > the same two streams, right ?
> > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > >>> > > >
> > > > > > > >>> > > > I have pasted the dummy code which collects the
> > > > non-matching
> > > > > > > >>> records (i
> > > > > > > >>> > > > have to perform this on the actual data, correct me
> if
> > I
> > > am
> > > > > > dong
> > > > > > > >>> > wrong).
> > > > > > > >>> > > >
> > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > >>> > > > ElementSelector())
> > > > > > > >>> > > >
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> > Integer>() {
> > > > > > > >>> > > >
> > > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > > 6408179761497497475L;
> > > > > > > >>> > > >
> > > > > > > >>> > > > @Override
> > > > > > > >>> > > > public void coGroup(Iterable<Integer> paramIterable,
> > > > > > > >>> Iterable<Integer>
> > > > > > > >>> > > > paramIterable1,
> > > > > > > >>> > > > Collector<Integer> paramCollector) throws Exception {
> > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > >>> > > >
> > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > >>> > > >
> > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > >>> > > > }
> > > > > > > >>> > > > }
> > > > > > > >>> > > > }).print();
> > > > > > > >>> > > >
> > > > > > > >>> > > > Regards,
> > > > > > > >>> > > > Vinay Patil
> > > > > > > >>> > > >
> > > > > > > >>> > > >
> > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > > > >>> [hidden email]>
> > > > > > > >>> > > > wrote:
> > > > > > > >>> > > >
> > > > > > > >>> > > > > You are right, debugged it for all elements , I can
> > do
> > > > that
> > > > > > > now.
> > > > > > > >>> > > > > Thanks a lot.
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > Regards,
> > > > > > > >>> > > > > Vinay Patil
> > > > > > > >>> > > > >
> > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > > >>> > [hidden email]>
> > > > > > > >>> > > > > wrote:
> > > > > > > >>> > > > >
> > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > Iterable<Integer>
> > > > > > iter2,
> > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and
> > iter2
> > > > are
> > > > > > not
> > > > > > > >>> > empty,
> > > > > > > >>> > > it
> > > > > > > >>> > > > >> means they are matched elements from both stream.
> > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it means
> that
> > > > they
> > > > > > are
> > > > > > > >>> > > unmatched.
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > [hidden email]
> > > > > > >
> > > > > > > >>> 写道:
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > I did not get you, even if we use Co-Group we
> have
> > > to
> > > > > > apply
> > > > > > > >>> it on
> > > > > > > >>> > a
> > > > > > > >>> > > > key
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > >>> > > > >> >
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> > > > Integer>()
> > > > > {
> > > > > > > >>> > > > >> > private static final long serialVersionUID =
> > > > > > > >>> 6408179761497497475L;
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > @Override
> > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > paramIterable,
> > > > > > > >>> > > Iterable<Integer>
> > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > >>> > > > >> > Collector<Integer> paramCollector) throws
> > Exception
> > > {
> > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > paramIterable.iterator();
> > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > >>> > > > >> > }
> > > > > > > >>> > > > >> > }
> > > > > > > >>> > > > >> > });
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > when I debug this ,only the matched element from
> > > both
> > > > > > stream
> > > > > > > >>> will
> > > > > > > >>> > > come
> > > > > > > >>> > > > >> in
> > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > What I want is how do I check for unmatched
> > elements
> > > > > from
> > > > > > > both
> > > > > > > >>> > > streams
> > > > > > > >>> > > > >> and
> > > > > > > >>> > > > >> > write it to sink.
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > Regards,
> > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J.
> Sax <
> > > > > > > >>> > [hidden email]>
> > > > > > > >>> > > > >> wrote:
> > > > > > > >>> > > > >> >
> > > > > > > >>> > > > >> >> You need to do an outer-join. However, there is
> > no
> > > > > > build-in
> > > > > > > >>> > support
> > > > > > > >>> > > > for
> > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> > > > outer-join
> > > > > as
> > > > > > > an
> > > > > > > >>> own
> > > > > > > >>> > > > >> operator.
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> -Matthias
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > > > > >>> > > > >> >>> Hi,
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> I have a question regarding the join
> operation,
> > > > > consider
> > > > > > > the
> > > > > > > >>> > > > following
> > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > >>> > > > >> >>>
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> >
> > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > >>> > > > >> >>>
> > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> > > > Integer>() {
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> private static final long serialVersionUID =
> 1L;
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> @Override
> > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1, Integer
> > > > > paramIN2)
> > > > > > > >>> throws
> > > > > > > >>> > > > >> Exception
> > > > > > > >>> > > > >> >> {
> > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > >>> > > > >> >>> }
> > > > > > > >>> > > > >> >>> }).print();
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> I perfectly get the elements that are matching
> > in
> > > > both
> > > > > > the
> > > > > > > >>> > > streams,
> > > > > > > >>> > > > >> >> however
> > > > > > > >>> > > > >> >>> my requirement is to write these matched
> > elements
> > > > and
> > > > > > also
> > > > > > > >>> the
> > > > > > > >>> > > > >> unmatched
> > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> How do I get the unmatched elements from each
> > > > stream ?
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>> Regards,
> > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > >>> > > > >> >>>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >> >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >>
> > > > > > > >>> > > > >
> > > > > > > >>> > > >
> > > > > > > >>> > >
> > > > > > > >>> >
> > > > > > > >>>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>