[jira] [Created] (FLINK-12936) Support intersect all and minus all to blink planner

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-12936) Support intersect all and minus all to blink planner

Shang Yuanchun (Jira)
Jingsong Lee created FLINK-12936:
------------------------------------

             Summary: Support intersect all and minus all to blink planner
                 Key: FLINK-12936
                 URL: https://issues.apache.org/jira/browse/FLINK-12936
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / Planner
            Reporter: Jingsong Lee
            Assignee: Jingsong Lee


Now, we just support intersect and minus, See ReplaceIntersectWithSemiJoinRule and ReplaceMinusWithAntiJoinRule, replace intersect with null aware semi-join and distinct aggregate.

We need support intersect all and minus all too.

Presto and Spark already support them:

[https://github.com/prestodb/presto/issues/4918]

https://issues.apache.org/jira/browse/SPARK-21274

I think them have a good rewrite design and we can follow them:

1.For intersect all

Input Query
{code:java}
SELECT c1 FROM ut1 INTERSECT ALL SELECT c1 FROM ut2
{code}
Rewritten Query
{code:java}
  SELECT c1
    FROM (
         SELECT replicate_row(min_count, c1)
         FROM (
              SELECT c1,
                     IF (vcol1_cnt > vcol2_cnt, vcol2_cnt, vcol1_cnt) AS min_count
              FROM (
                   SELECT   c1, count(vcol1) as vcol1_cnt, count(vcol2) as vcol2_cnt
                   FROM (
                        SELECT c1, true as vcol1, null as vcol2 FROM ut1
                        UNION ALL
                        SELECT c1, null as vcol1, true as vcol2 FROM ut2
                        ) AS union_all
                   GROUP BY c1
                   HAVING vcol1_cnt >= 1 AND vcol2_cnt >= 1
                  )
              )
          )
{code}
2.For minus all:

Input Query
{code:java}
SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2
{code}
Rewritten Query
{code:java}
 SELECT c1
    FROM (
     SELECT replicate_rows(sum_val, c1)
       FROM (
         SELECT c1, sum_val
           FROM (
             SELECT c1, sum(vcol) AS sum_val
               FROM (
                 SELECT 1L as vcol, c1 FROM ut1
                 UNION ALL
                 SELECT -1L as vcol, c1 FROM ut2
              ) AS union_all
            GROUP BY union_all.c1
          )
        WHERE sum_val > 0
       )
   )
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)