Can I do a lookup in FlinkSQL?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Can I do a lookup in FlinkSQL?

srikanth flink
Hi there,

I'm working with streaming in FlinkSQL. I've two tables created one with
dynamic stream and the other a periodic updates.
I would like to keep the periodic table a static(but updates with new data
every day or so by flushing the old), So at any point of time the static
table should contain new set of data.
With dynamic table being populated with stream data, could I do a lookup on
a column of static table to find if the value exists.

This is what I have done:
dynamic table: sourceKafka
static table: badips

Trying to build a list, kind of using ROW() function and done. From dynamic
table, trying to lookup into the list if the value exists.
Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
Resonse:
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
operator must have compatible types

Is it possible to solve my use case? If so, where am I going wrong?

Thanks
Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Can I do a lookup in FlinkSQL?

zhangjun
You  can  try to use UDTF





------------------ Original ------------------
From: srikanth flink <[hidden email]&gt;
Date: Mon,Sep 16,2019 9:23 PM
To: dev <[hidden email]&gt;, user <[hidden email]&gt;, user-ml <[hidden email]&gt;
Subject: Re: Can I do a lookup in FlinkSQL?



Hi there,

I'm working with streaming in FlinkSQL. I've two tables created one with
dynamic stream and the other a periodic updates.
I would like to keep the periodic table a static(but updates with new data
every day or so by flushing the old), So at any point of time the static
table should contain new set of data.
With dynamic table being populated with stream data, could I do a lookup on
a column of static table to find if the value exists.

This is what I have done:
dynamic table: sourceKafka
static table: badips

Trying to build a list, kind of using ROW() function and done. From dynamic
table, trying to lookup into the list if the value exists.
Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
Resonse:
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
operator must have compatible types

Is it possible to solve my use case? If so, where am I going wrong?

Thanks
Srikanth
Reply | Threaded
Open this post in threaded view
|

Re: Can I do a lookup in FlinkSQL?

Zhenghua Gao
In reply to this post by srikanth flink
The lookup fashion Temporal Join[1] should be a solution for your case and
there is an ITCase as an example[2]

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala


*Best Regards,*
*Zhenghua Gao*


On Mon, Sep 16, 2019 at 9:23 PM srikanth flink <[hidden email]> wrote:

> Hi there,
>
> I'm working with streaming in FlinkSQL. I've two tables created one with
> dynamic stream and the other a periodic updates.
> I would like to keep the periodic table a static(but updates with new data
> every day or so by flushing the old), So at any point of time the static
> table should contain new set of data.
> With dynamic table being populated with stream data, could I do a lookup on
> a column of static table to find if the value exists.
>
> This is what I have done:
> dynamic table: sourceKafka
> static table: badips
>
> Trying to build a list, kind of using ROW() function and done. From dynamic
> table, trying to lookup into the list if the value exists.
> Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
> where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
> Resonse:
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
> operator must have compatible types
>
> Is it possible to solve my use case? If so, where am I going wrong?
>
> Thanks
> Srikanth
>