Hi there,
I'm working with streaming in FlinkSQL. I've two tables created one with dynamic stream and the other a periodic updates. I would like to keep the periodic table a static(but updates with new data every day or so by flushing the old), So at any point of time the static table should contain new set of data. With dynamic table being populated with stream data, could I do a lookup on a column of static table to find if the value exists. This is what I have done: dynamic table: sourceKafka static table: badips Trying to build a list, kind of using ROW() function and done. From dynamic table, trying to lookup into the list if the value exists. Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips); Resonse: [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN operator must have compatible types Is it possible to solve my use case? If so, where am I going wrong? Thanks Srikanth |
You can try to use UDTF
------------------ Original ------------------ From: srikanth flink <[hidden email]> Date: Mon,Sep 16,2019 9:23 PM To: dev <[hidden email]>, user <[hidden email]>, user-ml <[hidden email]> Subject: Re: Can I do a lookup in FlinkSQL? Hi there, I'm working with streaming in FlinkSQL. I've two tables created one with dynamic stream and the other a periodic updates. I would like to keep the periodic table a static(but updates with new data every day or so by flushing the old), So at any point of time the static table should contain new set of data. With dynamic table being populated with stream data, could I do a lookup on a column of static table to find if the value exists. This is what I have done: dynamic table: sourceKafka static table: badips Trying to build a list, kind of using ROW() function and done. From dynamic table, trying to lookup into the list if the value exists. Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips); Resonse: [INFO] Submitting SQL update statement to the cluster... [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN operator must have compatible types Is it possible to solve my use case? If so, where am I going wrong? Thanks Srikanth |
In reply to this post by srikanth flink
The lookup fashion Temporal Join[1] should be a solution for your case and
there is an ITCase as an example[2] [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala *Best Regards,* *Zhenghua Gao* On Mon, Sep 16, 2019 at 9:23 PM srikanth flink <[hidden email]> wrote: > Hi there, > > I'm working with streaming in FlinkSQL. I've two tables created one with > dynamic stream and the other a periodic updates. > I would like to keep the periodic table a static(but updates with new data > every day or so by flushing the old), So at any point of time the static > table should contain new set of data. > With dynamic table being populated with stream data, could I do a lookup on > a column of static table to find if the value exists. > > This is what I have done: > dynamic table: sourceKafka > static table: badips > > Trying to build a list, kind of using ROW() function and done. From dynamic > table, trying to lookup into the list if the value exists. > Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s > where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips); > Resonse: > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN > operator must have compatible types > > Is it possible to solve my use case? If so, where am I going wrong? > > Thanks > Srikanth > |
Free forum by Nabble | Edit this page |