sandy du created FLINK-22541:
--------------------------------
Summary: add json format filter params
Key: FLINK-22541
URL:
https://issues.apache.org/jira/browse/FLINK-22541 Project: Flink
Issue Type: Improvement
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.12.0, 1.11.0
Reporter: sandy du
In my case,one kafka topic multiple table data,for example:
{"id":"121","source":"users","content":\{"name":"test01","age":20,"addr":"addr1"}}
{"id":"122","source":"users","content":\{"name":"test02","age":23,"addr":"addr2"}}
{"id":"124","source":"users","content":\{"name":"test03","age":34,"addr":"addr3"}}
{"id":"124","source":"order","content":\{"orderId":"100001","price":34,"addr":"addr1231"}}
{"id":"125","source":"order","content":\{"orderId":"100002","price":34,"addr":"addr1232"}}
{"id":"126","source":"order","content":\{"orderId":"100003","price":34,"addr":"addr1233"}}
I just want to consume data from talbe order,flink sql ddl like this:
CREATE TABLE order (
orderId STRING,
age INT,
addr STRING
)
with (
'connector'='kafka',
'topic'='kafkatopic',
'properties.bootstrap.servers'='localhost:9092',
'properties.group.id'='testGroup',
'scan.startup.mode'='earliest-offset',
'format'='json',
'path-fliter'='$[?(@.source=="order")]',
'path-data'='$.content'
);
path-fliter and path-data can use JsonPath (
https://github.com/json-path/JsonPath)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)