Hi Pakesh Kuma,
I think you can using the interval-join, e.g.:
orderStream
.keyBy(<KeySelector>)
.intervalJoin(invoiceStream.keyBy(<KeySelector>))
.between(Time.minutes(-5), Time.minutes(5))
The semantics of interval-join and detailed usage description can refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-joinHope to help you, and any feedback is welcome!
Bests,
Jincheng
Rakesh Kumar <
[hidden email]> 于2018年12月6日周四 下午7:10写道:
> Hi,
> I have two data sources one is for order data and another one is for
> invoice data, these two data i am pushing into kafka topic in json form. I
> wanted to delay order data for 5 mins because invoice data comes only after
> order data is generated. So, for that i have written a flink program which
> will take these two data from kafka and apply watermarks and delay order
> data for 5 mins. After applying watermarks on these data, i wanted to join
> these data based on order_id which is present in both order and invoice
> data. After Joining i wanted to push it to kafka in different topic.
>
> But, i am not able to join these data streams with 5 min delay and i am
> not able to figure it out.
>
> I am attaching my flink program below and it's dependency.
>