Jark Wu created FLINK-20374:
-------------------------------
Summary: Wrong result when shuffling changelog stream on non-primary-key columns
Key: FLINK-20374
URL:
https://issues.apache.org/jira/browse/FLINK-20374 Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Jark Wu
This is reported from user-zh ML:
http://apache-flink.147419.n8.nabble.com/flink-1-11-2-cdc-cdc-sql-sink-save-point-job-sink-td8593.html{code:sql}
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_audio_lyric_task',
'table-name' = 'test'
)
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_audio_lyric_task',
'table-name' = 'status'
);
-- output
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'xxx',
'index' = 'xxx',
'username' = 'xxx',
'password' = 'xxx',
'sink.bulk-flush.backoff.max-retries' = '100000',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
'sink.bulk-flush.max-actions' = '5000',
'sink.bulk-flush.max-size' = '10mb',
'sink.bulk-flush.interval' = '1s'
);
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
{code}
Data in mysql table:
{code}
test:
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.....
status
0, status0
1, status1
2, status2
.....
{code}
Operations:
1. start job with paralleslim=40, result in test_status sink is correct:
{code}
0, name0, 2020-07-06 00:00:00 , 0, status0
1, name1, 2020-07-06 00:00:00 , 1, status1
2, name2, 2020-07-06 00:00:00 , 1, status1
{code}
2. Update {{status}} of {{id=2}} record in table {{test}} from {{1}} to {{2}}.
3. Result is not correct because the {{id=2}} record is missing in the result.
The reason is that it shuffles the changelog {{test}} on {{status}} column which is not the primary key. Therefore, the ordering can't be guaranteed, and the result is wrong.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)