Fabian Hueske created FLINK-19790:
-------------------------------------
Summary: Writing MAP<STRING, STRING> to Kafka with JSON format produces incorrect data.
Key: FLINK-19790
URL:
https://issues.apache.org/jira/browse/FLINK-19790 Project: Flink
Issue Type: Bug
Components: Table SQL / Ecosystem
Affects Versions: 1.11.2
Reporter: Fabian Hueske
Running the following SQL script writes incorrect data to Kafka:
{code:java}
CREATE TEMPORARY TABLE tmp_1 (m MAP<String, String>) WITH (
'connector' = 'kafka',
'format' = 'json',
'properties.bootstrap.servers' = '...',
'properties.group.id' = '...',
'topic' = 'tmp-1'
);
CREATE TEMPORARY TABLE gen (k STRING, v STRING) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY VIEW gen_short AS
SELECT SUBSTR(k, 0, 4) AS k, SUBSTR(v, 0, 4) AS v FROM gen;
INSERT INTO tmp_1
SELECT MAP[k, v] FROM gen_short; {code}
Printing the content of the {{tmp-1}} topics results in the following output:
{code:java}
$ kafka-console-consumer --bootstrap-server ... --from-beginning --topic tmp-1 | head -n 5
{"m":{"8a93":"6102"}}
{"m":{"8a93":"6102","7922":"f737"}}
{"m":{"8a93":"6102","7922":"f737","9b63":"15b0"}}
{"m":{"8a93":"6102","7922":"f737","9b63":"15b0","c38b":"b55c"}}
{"m":{"8a93":"6102","7922":"f737","9b63":"15b0","c38b":"b55c","222c":"f3e2"}}
{code}
As you can see, the map is not correctly encoded as JSON and written to Kafka.
I've run the query with the Blink planner with object reuse and operator pipelining disabled.
Writing with Avro works as expected.
Hence I assume that the JSON encoder/serializer reuses the Map object when encoding the JSON.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)