[jira] [Created] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join

Shang Yuanchun (Jira)
Wu created FLINK-21001:
--------------------------

             Summary: Flink job is blocked while using tableEnvironment with tableFunction and join
                 Key: FLINK-21001
                 URL: https://issues.apache.org/jira/browse/FLINK-21001
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.11.2
         Environment: flink-11.2
            Reporter: Wu


The code is as follow.
{code:java}
//代码占位符
package com.oppo.recdata.datapipe;

import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum;
import com.oppo.recdata.datapipe.flink.transform.ExplodeModify;
import com.oppo.recdata.datapipe.flink.transform.TableExplode;
import com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction;
import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.row;

/**
 * @author [hidden email]
 */
public class BatchTable {
    public static void main(String[] args) {

        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, null, "&");
        tableEnv.createTemporarySystemFunction("explode", new TableExplode(modify));

        tableEnv.createFunction("collect_map", CollectMapAggregateFunction.class);

        Table table = tableEnv.fromValues(
            DataTypes.ROW(
                DataTypes.FIELD("buuid", DataTypes.STRING()),
                DataTypes.FIELD("docType", DataTypes.INT()),
                DataTypes.FIELD("viewTime", DataTypes.INT()),
                DataTypes.FIELD("subCategory", DataTypes.STRING())
            ),
            row("John", "1", "36", "NBA&football")
        );

        tableEnv.createTemporaryView("feeds_expose_click_profile", table);


        Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as varchar) as docType, viewTime, subCategory from feeds_expose_click_profile where buuid is not null and docType is not null and viewTime > 0");
        tableEnv.createTemporaryView("add_profile", add_profile);

        Table cate2Click = tableEnv.sqlQuery("select buuid, docType, viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as t(cate2) where subCategory is not null");
        tableEnv.createTemporaryView("cate2_click", cate2Click);

        Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, docType");
        tableEnv.createTemporaryView("user_cate2_detail", cate2_detail);

        Table user_global_cate2 = tableEnv.sqlQuery("select 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue,  buuid as keyName, docType from cate2_click group by buuid, docType");
        tableEnv.createTemporaryView("user_global_cate2", user_global_cate2);

        Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as keyName, docType from cate2_click group by cate2, docType ");
        tableEnv.createTemporaryView("global_user_cate2",global_user_cate2);

        Table global_user_global_cate2 = tableEnv.sqlQuery("select 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, 'global_feature' as keyName, docType from cate2_click group by docType");
        tableEnv.createTemporaryView("global_user_global_cate2", global_user_global_cate2);

        Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and a.docType = c.docType join global_user_global_cate2 d on a.docType = d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 and d.fieldValue > 0");
        tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail);

        Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, concat(docType, '#', keyName) as keyName from cate2_cs_detail  where fieldValue < 0 or fieldValue >= 0 group by keyName, docType");

        cate2Cs.execute().print();
    }
}


{code}
The client log is as follow.
{code:java}
//代码占位符
"C:\Program Files\Java\jdk1.8.0_73\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\lib\idea_rt.jar=64196:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\80242151\AppData\Local\Temp\classpath403316789.jar com.oppo.recdata.datapipe.BatchTable
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/lib/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.8.2/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.15/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-01-17 15:05:25,639 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2021-01-17 15:05:25,645 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2021-01-17 15:05:25,652 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2021-01-17 15:05:25,653 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)