[jira] [Created] (FLINK-20763) canal format parse update record with null value get wrong result

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20763) canal format parse update record with null value get wrong result

Shang Yuanchun (Jira)
WangRuoQi created FLINK-20763:
---------------------------------

             Summary: canal format parse update record with null value get wrong result
                 Key: FLINK-20763
                 URL: https://issues.apache.org/jira/browse/FLINK-20763
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.11.2
            Reporter: WangRuoQi
         Attachments: canal_format.patch

When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order no, and i got a result with dirrent count(order_no) and count(*).

I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.

 

The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
   // "data" field is an array of row, contains new rows
   ArrayData data = row.getArray(0);
   // "old" field is an array of row, contains old values
   ArrayData old = row.getArray(1);
   for (int i = 0; i < data.size(); i++) {
      // the underlying JSON deserialization schema always produce GenericRowData.
      GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
      GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
      for (int f = 0; f < fieldCount; f++) {
         if (before.isNullAt(f)) {
            // not null fields in "old" (before) means the fields are changed
            // null/empty fields in "old" (before) means the fields are not changed
            // so we just copy the not changed fields into before
            before.setField(f, after.getField(f));
         }
      }
      before.setRowKind(RowKind.UPDATE_BEFORE);
      after.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(before);
      out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result.

 

I tried to fix this bug with following logic.

For each field. Use old value when old row has this field whether it is null or nut, Use new value by default.

I hope this bug will be fixed on the future version.

[^canal_format.patch]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)