sean.miao created FLINK-10119:
---------------------------------
Summary: 存在数据非json格式,使用KafkaJsonTableSource的话,job无法拉起。
Key: FLINK-10119
URL:
https://issues.apache.org/jira/browse/FLINK-10119 Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.5.1
Environment: 无
Reporter: sean.miao
开启checkpoint和savepoint,同时开启了job的自动拉起。
flink从kafka消费数据,使用的是Kafka010JsonTableSource。发现只要有一条数据非json格式,就会导致应用挂掉无法拉起。
当前,这仅是满足了处理语义,但是导致应用不可以用就不太好了吧。能不能改成像spark sql一样,不满足格式的数据,增加到一个专门存储无法解析的数据的列里面。
我们目前的做法是
JsonRowDeserializationSchema
@Override
public Row deserialize(byte[] message) throws IOException {
try {
final JsonNode root = objectMapper.readTree(message);
return convertRow(root, (RowTypeInfo) typeInfo);
} catch (Throwable t) {
throw new IOException("Failed to deserialize JSON object.", t);
}
}
catch 里抛异常改成了传入一个 “{}”,会使得所有不能解析数据给所有列返回空值。
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)