[jira] [Created] (FLINK-20273) Fix Table api Kafka connector Sink Partitioner Document Error

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20273) Fix Table api Kafka connector Sink Partitioner Document Error

Shang Yuanchun (Jira)
Shengkai Fang created FLINK-20273:
-------------------------------------

             Summary: Fix Table api Kafka connector Sink Partitioner Document Error
                 Key: FLINK-20273
                 URL: https://issues.apache.org/jira/browse/FLINK-20273
             Project: Flink
          Issue Type: Bug
          Components: Documentation, Table SQL / API
    Affects Versions: 1.12.0
            Reporter: Shengkai Fang


The [doc|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#sink-partitioning] tells us that the kafka sink uses fixed partitioner by default. However, in my local test, the it uses sticky partitioner to get the record partition id if key is not set.

You can add the test in the {{KafkaTableITCase}}, the code follows
{code:java}
public void testKafkaSourceSinkWithDefaultPartitioner() throws Exception {
                if (isLegacyConnector) {
                        return;
                }
                // we always use a different topic name for each parameterized topic,
                // in order to make sure the topic can be created.
                final String topic = "key_full_value_topic_" + format;
                createTestTopic(topic, 3, 1);

                // ---------- Produce an event time stream into Kafka -------------------
                String groupId = standardProps.getProperty("group.id");
                String bootstraps = standardProps.getProperty("bootstrap.servers");

                // compared to the partial value test we cannot support both k_user_id and user_id in a full
                // value due to duplicate names after key prefix stripping,
                // fields are reordered on purpose,
                // fields for keys and values are overlapping
                final String createSourceTable = String.format(
                                "CREATE TABLE kafkaSource (\n"
                                                + "  `user_id` BIGINT,\n"
                                                + "  `name` STRING,\n"
                                                + "  `partition` INT METADATA"
                                                + ") WITH (\n"
                                                + "  'connector' = 'kafka',\n"
                                                + "  'topic' = '%s',\n"
                                                + "  'properties.bootstrap.servers' = '%s',\n"
                                                + "  'properties.group.id' = '%s',\n"
                                                + "  'scan.startup.mode' = 'earliest-offset',\n"
                                                + "  'format' = '%s'\n"
                                                + ")",
                                topic,
                                bootstraps,
                                groupId,
                                format);
                final String createSinkTable = String.format(
                                "CREATE TABLE kafkaSink (\n"
                                                + "  `user_id` BIGINT,\n"
                                                + "  `name` STRING\n"
                                                + ") WITH (\n"
                                                + "  'connector' = 'kafka',\n"
                                                + "  'topic' = '%s',\n"
                                                + "  'properties.bootstrap.servers' = '%s',\n"
                                                + "  'properties.group.id' = '%s',\n"
                                                + "  'scan.startup.mode' = 'earliest-offset',\n"
                                                + "  'format' = '%s'\n"
                                                + ")",
                                topic,
                                bootstraps,
                                groupId,
                                format);

                tEnv.executeSql(createSourceTable);
                tEnv.executeSql(createSinkTable);

                String initialValues = "INSERT INTO kafkaSink\n"
                                                                        + "VALUES\n"
                                                                        + " (1, 'name 1'),\n"
                                                                        + " (2, 'name 2'),\n"
                                                                        + " (3, 'name 3')";
                tEnv.executeSql(initialValues).await();

                initialValues = "INSERT INTO kafkaSink\n"
                                + "VALUES\n"
                                + " (4, 'name 4'),\n"
                                + " (5, 'name 5'),\n"
                                + " (6, 'name 6')";
                tEnv.executeSql(initialValues).await();

                initialValues = "INSERT INTO kafkaSink\n"
                                + "VALUES\n"
                                + " (7, 'name 7'),\n"
                                + " (8, 'name 8'),\n"
                                + " (9, 'name 9')";
                tEnv.executeSql(initialValues).await();


                // ---------- Consume stream from Kafka -------------------

                final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM kafkaSource"), 9);

                final List<Row> expected = Arrays.asList(
                                Row.of(1L, "name 1", LocalDateTime.parse("2020-03-08T13:12:11.123"), 100L, "payload 1"),
                                Row.of(2L, "name 2", LocalDateTime.parse("2020-03-09T13:12:11.123"), 101L, "payload 2"),
                                Row.of(3L, "name 3", LocalDateTime.parse("2020-03-10T13:12:11.123"), 102L, "payload 3")
                );

                assertThat(result, deepEqualTo(expected, true));

                // ------------- cleanup -------------------

                deleteTestTopic(topic);
        }
{code}
The test will use the kafka default partitioner and sends record to kafka topic. After insert, we can read the record with the parititon id. If it uses the fixed partitioner, all records will has the same partition id. I repeat the test 3 times and the results are
{code:java}
// the first result
<1,name 1,1>
<2,name 2,1>
<3,name 3,1>
<7,name 7,1>
<4,name 4,0>
<5,name 5,0>
<6,name 6,0>
<8,name 8,0>
<9,name 9,0>
// the second result
<1,name 1,1>
<2,name 2,1>
<3,name 3,1>
<4,name 4,0>
<5,name 5,0>
<6,name 6,0>
<7,name 7,0>
<8,name 8,0>
<9,name 9,0>
// the third result
<9,name 9,2>
<1,name 1,0>
<2,name 2,0>
<3,name 3,0>
<4,name 4,0>
<5,name 5,0>
<6,name 6,0>
<7,name 7,1>
<8,name 8,1>
{code}
The last column is the partition-id and we have 3 partitions in the test. The results show the default partitioner is sticky paritioner rather than fixed partitioner.

By the way, the sink partitioning section in the doc only works when the key is null. If we set the key fields, the {{round-robin}} strategy will not work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)