[jira] [Created] (FLINK-22541) add json format filter params

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22541) add json format filter params

Shang Yuanchun (Jira)
sandy du created FLINK-22541:
--------------------------------

             Summary: add json format filter params
                 Key: FLINK-22541
                 URL: https://issues.apache.org/jira/browse/FLINK-22541
             Project: Flink
          Issue Type: Improvement
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.12.0, 1.11.0
            Reporter: sandy du


In my case,one kafka topic  multiple table data,for example:
{"id":"121","source":"users","content":\{"name":"test01","age":20,"addr":"addr1"}}
{"id":"122","source":"users","content":\{"name":"test02","age":23,"addr":"addr2"}}
{"id":"124","source":"users","content":\{"name":"test03","age":34,"addr":"addr3"}}
{"id":"124","source":"order","content":\{"orderId":"100001","price":34,"addr":"addr1231"}}
{"id":"125","source":"order","content":\{"orderId":"100002","price":34,"addr":"addr1232"}}
{"id":"126","source":"order","content":\{"orderId":"100003","price":34,"addr":"addr1233"}}
 
I  just want to consume data from  talbe order,flink sql ddl like this:
CREATE TABLE order (
orderId STRING,
age INT,
addr STRING
)
with (
'connector'='kafka',
'topic'='kafkatopic',
'properties.bootstrap.servers'='localhost:9092',
'properties.group.id'='testGroup',
'scan.startup.mode'='earliest-offset',
'format'='json',
'path-fliter'='$[?(@.source=="order")]',
'path-data'='$.content'
);
 
path-fliter and path-data can use  JsonPath (https://github.com/json-path/JsonPath)
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)