Join of three streams with different frequency

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Join of three streams with different frequency

Dominik Wosiński
Hello,
I wanted to ask whether the idea and the general concept that I have is
correct or if there is anything better in Flink to use.

Say, I that I have 3 different streams :

   - currency_codes, which has the name of the currency. It has the
   following fields  (*currency_iso_code, tst, currency_name)*
   - exchange_rate, which is a rarely changing stream that has data bout
   exchange rates for different currencies. It has the following
fields (*currency_iso_code,
   tst, rate*)
   - fares, which represents taxi fares. It has the following fields
(*currency_iso_code,
   tst, price*)

Generally, let's assume that this is the simple system that allows drivers
to charge clients in different currencies, but in the end, we want to have
all calculations in USD, but we want to have the *name* of the original
currency.

The current idea that I have is that since *currency_codes* have the names
of the currencies and it does not change frequently (say It will change
once a month tops) we should use this as a *BroadcastState *to be able to
join it with *exchange_rate*. After that, when we have the name of the
foreign currency and the exchange rate, we can use a LATERAL TABLE join to
join it with Fares.

The question is, whether my assumptions are correct and if there is any
better idea to solve this problem?

Thanks in advance,
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Piotr Nowojski-3
Hi,

You have basically a motivating example between Temporal Table Joins [1]. Please take a look at them.

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html>

> On 20 Jan 2020, at 17:02, Dominik Wosiński <[hidden email]> wrote:
>
> Hello,
> I wanted to ask whether the idea and the general concept that I have is
> correct or if there is anything better in Flink to use.
>
> Say, I that I have 3 different streams :
>
>   - currency_codes, which has the name of the currency. It has the
>   following fields  (*currency_iso_code, tst, currency_name)*
>   - exchange_rate, which is a rarely changing stream that has data bout
>   exchange rates for different currencies. It has the following
> fields (*currency_iso_code,
>   tst, rate*)
>   - fares, which represents taxi fares. It has the following fields
> (*currency_iso_code,
>   tst, price*)
>
> Generally, let's assume that this is the simple system that allows drivers
> to charge clients in different currencies, but in the end, we want to have
> all calculations in USD, but we want to have the *name* of the original
> currency.
>
> The current idea that I have is that since *currency_codes* have the names
> of the currencies and it does not change frequently (say It will change
> once a month tops) we should use this as a *BroadcastState *to be able to
> join it with *exchange_rate*. After that, when we have the name of the
> foreign currency and the exchange rate, we can use a LATERAL TABLE join to
> join it with Fares.
>
> The question is, whether my assumptions are correct and if there is any
> better idea to solve this problem?
>
> Thanks in advance,
> Best Regards,
> Dom.

Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Dominik Wosiński
Hey,
I have considered the Temporal Table Joins, but as far as I know from the
docs, it is only currently supported in the Blink Planner and does not
currently support event time join, which is why I was trying to find a
different solution.  Or has anything changed or is my understanding
incorrect ??

Best Regards,
Dom.

wt., 21 sty 2020 o 11:08 Piotr Nowojski <[hidden email]> napisał(a):

> Hi,
>
> You have basically a motivating example between Temporal Table Joins [1].
> Please take a look at them.
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> >
>
> > On 20 Jan 2020, at 17:02, Dominik Wosiński <[hidden email]> wrote:
> >
> > Hello,
> > I wanted to ask whether the idea and the general concept that I have is
> > correct or if there is anything better in Flink to use.
> >
> > Say, I that I have 3 different streams :
> >
> >   - currency_codes, which has the name of the currency. It has the
> >   following fields  (*currency_iso_code, tst, currency_name)*
> >   - exchange_rate, which is a rarely changing stream that has data bout
> >   exchange rates for different currencies. It has the following
> > fields (*currency_iso_code,
> >   tst, rate*)
> >   - fares, which represents taxi fares. It has the following fields
> > (*currency_iso_code,
> >   tst, price*)
> >
> > Generally, let's assume that this is the simple system that allows
> drivers
> > to charge clients in different currencies, but in the end, we want to
> have
> > all calculations in USD, but we want to have the *name* of the original
> > currency.
> >
> > The current idea that I have is that since *currency_codes* have the
> names
> > of the currencies and it does not change frequently (say It will change
> > once a month tops) we should use this as a *BroadcastState *to be able to
> > join it with *exchange_rate*. After that, when we have the name of the
> > foreign currency and the exchange rate, we can use a LATERAL TABLE join
> to
> > join it with Fares.
> >
> > The question is, whether my assumptions are correct and if there is any
> > better idea to solve this problem?
> >
> > Thanks in advance,
> > Best Regards,
> > Dom.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Piotr Nowojski-3
In Flink Planner this functionality is supported and exposed via Temporal Table Functions [2] and it does support event-time [3]. Only the non function Temporal Table syntax is supported only by the Blink, but functionality as far as I recall is the same.

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function>
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins>

Piotrek

> On 21 Jan 2020, at 11:25, Dominik Wosiński <[hidden email]> wrote:
>
> Hey,
> I have considered the Temporal Table Joins, but as far as I know from the
> docs, it is only currently supported in the Blink Planner and does not
> currently support event time join, which is why I was trying to find a
> different solution.  Or has anything changed or is my understanding
> incorrect ??
>
> Best Regards,
> Dom.
>
> wt., 21 sty 2020 o 11:08 Piotr Nowojski <[hidden email]> napisał(a):
>
>> Hi,
>>
>> You have basically a motivating example between Temporal Table Joins [1].
>> Please take a look at them.
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
>> <
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
>>>
>>
>>> On 20 Jan 2020, at 17:02, Dominik Wosiński <[hidden email]> wrote:
>>>
>>> Hello,
>>> I wanted to ask whether the idea and the general concept that I have is
>>> correct or if there is anything better in Flink to use.
>>>
>>> Say, I that I have 3 different streams :
>>>
>>>  - currency_codes, which has the name of the currency. It has the
>>>  following fields  (*currency_iso_code, tst, currency_name)*
>>>  - exchange_rate, which is a rarely changing stream that has data bout
>>>  exchange rates for different currencies. It has the following
>>> fields (*currency_iso_code,
>>>  tst, rate*)
>>>  - fares, which represents taxi fares. It has the following fields
>>> (*currency_iso_code,
>>>  tst, price*)
>>>
>>> Generally, let's assume that this is the simple system that allows
>> drivers
>>> to charge clients in different currencies, but in the end, we want to
>> have
>>> all calculations in USD, but we want to have the *name* of the original
>>> currency.
>>>
>>> The current idea that I have is that since *currency_codes* have the
>> names
>>> of the currencies and it does not change frequently (say It will change
>>> once a month tops) we should use this as a *BroadcastState *to be able to
>>> join it with *exchange_rate*. After that, when we have the name of the
>>> foreign currency and the exchange rate, we can use a LATERAL TABLE join
>> to
>>> join it with Fares.
>>>
>>> The question is, whether my assumptions are correct and if there is any
>>> better idea to solve this problem?
>>>
>>> Thanks in advance,
>>> Best Regards,
>>> Dom.
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Dominik Wosiński
Thanksy!!

I will take a look at the provided links.
Best Regrads,
Dom.

wt., 21 sty 2020 o 12:35 Piotr Nowojski <[hidden email]> napisał(a):

> In Flink Planner this functionality is supported and exposed via Temporal
> Table Functions [2] and it does support event-time [3]. Only the non
> function Temporal Table syntax is supported only by the Blink, but
> functionality as far as I recall is the same.
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function>
>
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins
> >
>
> Piotrek
>
> > On 21 Jan 2020, at 11:25, Dominik Wosiński <[hidden email]> wrote:
> >
> > Hey,
> > I have considered the Temporal Table Joins, but as far as I know from the
> > docs, it is only currently supported in the Blink Planner and does not
> > currently support event time join, which is why I was trying to find a
> > different solution.  Or has anything changed or is my understanding
> > incorrect ??
> >
> > Best Regards,
> > Dom.
> >
> > wt., 21 sty 2020 o 11:08 Piotr Nowojski <[hidden email]>
> napisał(a):
> >
> >> Hi,
> >>
> >> You have basically a motivating example between Temporal Table Joins
> [1].
> >> Please take a look at them.
> >>
> >> Piotrek
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> >> <
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> >>>
> >>
> >>> On 20 Jan 2020, at 17:02, Dominik Wosiński <[hidden email]> wrote:
> >>>
> >>> Hello,
> >>> I wanted to ask whether the idea and the general concept that I have is
> >>> correct or if there is anything better in Flink to use.
> >>>
> >>> Say, I that I have 3 different streams :
> >>>
> >>>  - currency_codes, which has the name of the currency. It has the
> >>>  following fields  (*currency_iso_code, tst, currency_name)*
> >>>  - exchange_rate, which is a rarely changing stream that has data bout
> >>>  exchange rates for different currencies. It has the following
> >>> fields (*currency_iso_code,
> >>>  tst, rate*)
> >>>  - fares, which represents taxi fares. It has the following fields
> >>> (*currency_iso_code,
> >>>  tst, price*)
> >>>
> >>> Generally, let's assume that this is the simple system that allows
> >> drivers
> >>> to charge clients in different currencies, but in the end, we want to
> >> have
> >>> all calculations in USD, but we want to have the *name* of the original
> >>> currency.
> >>>
> >>> The current idea that I have is that since *currency_codes* have the
> >> names
> >>> of the currencies and it does not change frequently (say It will change
> >>> once a month tops) we should use this as a *BroadcastState *to be able
> to
> >>> join it with *exchange_rate*. After that, when we have the name of the
> >>> foreign currency and the exchange rate, we can use a LATERAL TABLE join
> >> to
> >>> join it with Fares.
> >>>
> >>> The question is, whether my assumptions are correct and if there is any
> >>> better idea to solve this problem?
> >>>
> >>> Thanks in advance,
> >>> Best Regards,
> >>> Dom.
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Kurt Young
Temporal table functions is supported by both planners, feel free to use
any of them.

Best,
Kurt

Dominik Wosiński <[hidden email]>于2020年1月21日 周二19:39写道:

> Thanksy!!
>
> I will take a look at the provided links.
> Best Regrads,
> Dom.
>
> wt., 21 sty 2020 o 12:35 Piotr Nowojski <[hidden email]> napisał(a):
>
> > In Flink Planner this functionality is supported and exposed via Temporal
> > Table Functions [2] and it does support event-time [3]. Only the non
> > function Temporal Table syntax is supported only by the Blink, but
> > functionality as far as I recall is the same.
> >
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table-function
> >
> >
> > [3]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#event-time-temporal-joins
> > >
> >
> > Piotrek
> >
> > > On 21 Jan 2020, at 11:25, Dominik Wosiński <[hidden email]> wrote:
> > >
> > > Hey,
> > > I have considered the Temporal Table Joins, but as far as I know from
> the
> > > docs, it is only currently supported in the Blink Planner and does not
> > > currently support event time join, which is why I was trying to find a
> > > different solution.  Or has anything changed or is my understanding
> > > incorrect ??
> > >
> > > Best Regards,
> > > Dom.
> > >
> > > wt., 21 sty 2020 o 11:08 Piotr Nowojski <[hidden email]>
> > napisał(a):
> > >
> > >> Hi,
> > >>
> > >> You have basically a motivating example between Temporal Table Joins
> > [1].
> > >> Please take a look at them.
> > >>
> > >> Piotrek
> > >>
> > >> [1]
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> > >> <
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html
> > >>>
> > >>
> > >>> On 20 Jan 2020, at 17:02, Dominik Wosiński <[hidden email]> wrote:
> > >>>
> > >>> Hello,
> > >>> I wanted to ask whether the idea and the general concept that I have
> is
> > >>> correct or if there is anything better in Flink to use.
> > >>>
> > >>> Say, I that I have 3 different streams :
> > >>>
> > >>>  - currency_codes, which has the name of the currency. It has the
> > >>>  following fields  (*currency_iso_code, tst, currency_name)*
> > >>>  - exchange_rate, which is a rarely changing stream that has data
> bout
> > >>>  exchange rates for different currencies. It has the following
> > >>> fields (*currency_iso_code,
> > >>>  tst, rate*)
> > >>>  - fares, which represents taxi fares. It has the following fields
> > >>> (*currency_iso_code,
> > >>>  tst, price*)
> > >>>
> > >>> Generally, let's assume that this is the simple system that allows
> > >> drivers
> > >>> to charge clients in different currencies, but in the end, we want to
> > >> have
> > >>> all calculations in USD, but we want to have the *name* of the
> original
> > >>> currency.
> > >>>
> > >>> The current idea that I have is that since *currency_codes* have the
> > >> names
> > >>> of the currencies and it does not change frequently (say It will
> change
> > >>> once a month tops) we should use this as a *BroadcastState *to be
> able
> > to
> > >>> join it with *exchange_rate*. After that, when we have the name of
> the
> > >>> foreign currency and the exchange rate, we can use a LATERAL TABLE
> join
> > >> to
> > >>> join it with Fares.
> > >>>
> > >>> The question is, whether my assumptions are correct and if there is
> any
> > >>> better idea to solve this problem?
> > >>>
> > >>> Thanks in advance,
> > >>> Best Regards,
> > >>> Dom.
> > >>
> > >>
> >
> >
>
--
Best,
Kurt
Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Dominik Wosiński
Hey,
But isn't the temporal table join generating output only on watermark ??? I
have found such info here:
https://stackoverflow.com/questions/54441594/how-to-use-flink-temporal-tables.
But since one of the tables will have data that changes very rarely, this
would mean that using a temporal table would always wait for the slow
stream to create the watermark ??

Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Join of three streams with different frequency

Piotr Nowojski-3
Hey,

Yes that’s true for temporal table joins with event time. You would have to make sure that watermarks are progressing, like periodically publish them [1], or some other custom logic. I’m not sure what’s works out of the box in the Table API, however you can always register DataStream as a Table.

For the record, processing time temporal table joins emit updates immediately. [2]

Piotrek

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators <https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators>
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-joins <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-joins>

> On 29 Jan 2020, at 11:48, Dominik Wosiński <[hidden email]> wrote:
>
> Hey,
> But isn't the temporal table join generating output only on watermark ??? I
> have found such info here:
> https://stackoverflow.com/questions/54441594/how-to-use-flink-temporal-tables.
> But since one of the tables will have data that changes very rarely, this
> would mean that using a temporal table would always wait for the slow
> stream to create the watermark ??
>
> Best Regards,
> Dom.