[jira] [Created] (FLINK-17093) Python UDF doesn't work when the input column is of composite type

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17093) Python UDF doesn't work when the input column is of composite type

Shang Yuanchun (Jira)
Dian Fu created FLINK-17093:
-------------------------------

             Summary: Python UDF doesn't work when the input column is of composite type
                 Key: FLINK-17093
                 URL: https://issues.apache.org/jira/browse/FLINK-17093
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.10.0
            Reporter: Dian Fu
            Assignee: Dian Fu
             Fix For: 1.10.1, 1.11.0


For the following job:
{code}

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
import os

@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING()],
 result_type=DataTypes.STRING())
def get_host_ip(source, qr, sip, dip):
    if source == "NGAF" and qr == '1':
        return dip
    return sip

@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING()],
 result_type=DataTypes.STRING())
def get_dns_server_ip(source, qr, sip, dip):
    if source == "NGAF" and qr == '1':
        return sip
    return dip

def test_case():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env)

     from pyflink.table import Row
   table = t_env.from_elements(
      [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", qr="qr", queries="queries", answers="answers", qtypes="qtypes", atypes="atypes", rcode="rcode", ts="ts",))],
    DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
 DataTypes.FIELD("data",
 DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
 DataTypes.FIELD("devid", DataTypes.STRING()),
 DataTypes.FIELD('sip', DataTypes.STRING()),
 DataTypes.FIELD('dip', DataTypes.STRING()),
 DataTypes.FIELD("qr", DataTypes.STRING()),
 DataTypes.FIELD("queries", DataTypes.STRING()),
 DataTypes.FIELD("answers", DataTypes.STRING()),
 DataTypes.FIELD("qtypes", DataTypes.STRING()),
 DataTypes.FIELD("atypes", DataTypes.STRING()),
 DataTypes.FIELD("rcode", DataTypes.STRING()),
 DataTypes.FIELD("ts", DataTypes.STRING())]))
 ]
 ))

 result_file = "/tmp/test.csv"
 if os.path.exists(result_file):
 os.remove(result_file)

 t_env.register_table_sink("Results",
 CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'],
 [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(),
 DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
 "/tmp/test.csv"))

 t_env.register_function("get_host_ip", get_host_ip)
 t_env.register_function("get_dns_server_ip", get_dns_server_ip)

 t_env.register_table("source", table)
 standard_table = t_env.sql_query("select data.*, stype as dns_type from source")\
 .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
 t_env.register_table("standard_table", standard_table)

 final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as host_ip,"
 "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table")

 final_table.insert_into("Results")

 t_env.execute("test")


if __name__ == '__main__':
 test_case()
{code}

The plan is as following which is not correct:
{code}
 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)