---------- Forwarded message ---------
发件人: Xingbo Huang <[hidden email]> Date: 2020年5月29日周五 下午4:30 Subject: Re: pyflink Table Api连接 外部系统问题 To: 刘亚坤 <[hidden email]> 你好, 你想问的应该是如何把kafka里面的一整个json数据当成一个string读进来,然后不做任何format解析对吧。如果是这样的话,我的理解是,你首先不能用json format,需要使用csv format,然后你得指定一个field_delimiter,默认的是逗号,你得换一个,比如\n,要不然就会把你的json字符串数据按照都厚给切分开了。我刚刚用descriptor试验了一下,没有问题。你可以试试。 下面是我整个PyFlink读取json串进来然后解析数据中time字段的作业 def str_func(str_param): import json return json.loads(str_param)['time'] s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(1) s_env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) st_env = StreamTableEnvironment.create(s_env) result_file = "/tmp/slide_row_window_streaming.csv" if os.path.exists(result_file): os.remove(result_file) st_env \ .connect( # declare the external system to connect to Kafka() .version("0.11") .topic("user") .start_from_earliest() .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") ) \ .with_format( # declare a format for this system Csv() .schema(DataTypes.ROW( [DataTypes.FIELD("a", DataTypes.STRING()) ])) .field_delimiter('\n') ) \ .with_schema( # declare the schema of the table Schema() .field("a", DataTypes.STRING()) ) \ .in_append_mode() \ .register_table_source("source") st_env.register_function( "str_func", udf(str_func, [DataTypes.STRING()], DataTypes.STRING())) st_env.register_table_sink("sink", CsvTableSink(["a"], [DataTypes.STRING()], result_file)) st_env.scan("source").select("str_func(a)").insert_into("sink") kafka里面的数据 {"a": "a", "b": 1, "c": 1, "time": "2013-01-01T00:14:13Z"} {"a": "b", "b": 2, "c": 2, "time": "2013-01-01T00:24:13Z"} {"a": "a", "b": 3, "c": 3, "time": "2013-01-01T00:34:13Z"} {"a": "a", "b": 4, "c": 4, "time": "2013-01-01T01:14:13Z"} {"a": "b", "b": 4, "c": 5, "time": "2013-01-01T01:24:13Z"} {"a": "a", "b": 5, "c": 2, "time": "2013-01-01T01:34:13Z"} 最后Csv里面的结果数据为 2013-01-01T00:14:13Z 2013-01-01T00:24:13Z 2013-01-01T00:34:13Z 2013-01-01T01:14:13Z 2013-01-01T01:24:13Z 2013-01-01T01:34:13Z Best, Xingbo 刘亚坤 <[hidden email]> 于2020年5月29日周五 下午2:17写道: > 目前在学习使用pyflink的Table api,请教一个问题: > 1、Table Api连接kafka系统,能否把整条的kafka消息看成是一个table字段进行处理?比如,kafka > topic连的消息为一个json字符串,把这个字符串整体当做是一个字段,这样可以方便使用 pyflink 的udf函数对消息进行处理转换等操作? > 2、如果以上可行,连接kafka的数据格式如何设置,即with_format如何设置,目前官网这方便的资料较少。 > > 新手入门,请多指教,感谢。 > |
Free forum by Nabble | Edit this page |