Dian Fu created FLINK-17093:
------------------------------- Summary: Python UDF doesn't work when the input column is of composite type Key: FLINK-17093 URL: https://issues.apache.org/jira/browse/FLINK-17093 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.10.0 Reporter: Dian Fu Assignee: Dian Fu Fix For: 1.10.1, 1.11.0 For the following job: {code} from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import BatchTableEnvironment, StreamTableEnvironment, EnvironmentSettings, CsvTableSink from pyflink.table.descriptors import Schema, Kafka, Json from pyflink.table import DataTypes from pyflink.table.udf import ScalarFunction, udf import os @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) def get_host_ip(source, qr, sip, dip): if source == "NGAF" and qr == '1': return dip return sip @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], result_type=DataTypes.STRING()) def get_dns_server_ip(source, qr, sip, dip): if source == "NGAF" and qr == '1': return sip return dip def test_case(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) from pyflink.table import Row table = t_env.from_elements( [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip", qr="qr", queries="queries", answers="answers", qtypes="qtypes", atypes="atypes", rcode="rcode", ts="ts",))], DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()), DataTypes.FIELD("data", DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()), DataTypes.FIELD("devid", DataTypes.STRING()), DataTypes.FIELD('sip', DataTypes.STRING()), DataTypes.FIELD('dip', DataTypes.STRING()), DataTypes.FIELD("qr", DataTypes.STRING()), DataTypes.FIELD("queries", DataTypes.STRING()), DataTypes.FIELD("answers", DataTypes.STRING()), DataTypes.FIELD("qtypes", DataTypes.STRING()), DataTypes.FIELD("atypes", DataTypes.STRING()), DataTypes.FIELD("rcode", DataTypes.STRING()), DataTypes.FIELD("ts", DataTypes.STRING())])) ] )) result_file = "/tmp/test.csv" if os.path.exists(result_file): os.remove(result_file) t_env.register_table_sink("Results", CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n'], [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], "/tmp/test.csv")) t_env.register_function("get_host_ip", get_host_ip) t_env.register_function("get_dns_server_ip", get_dns_server_ip) t_env.register_table("source", table) standard_table = t_env.sql_query("select data.*, stype as dns_type from source")\ .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')") t_env.register_table("standard_table", standard_table) final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip) as host_ip," "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM standard_table") final_table.insert_into("Results") t_env.execute("test") if __name__ == '__main__': test_case() {code} The plan is as following which is not correct: {code} org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL', _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')), select: (data, type) -> select: (type, get_host_ip(type.source, type.qr, type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip, type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row -> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes, atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4) (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |