[jira] [Created] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

Shang Yuanchun (Jira)
Shengkai Fang created FLINK-22969:
-------------------------------------

             Summary: Validate the topic is not null or empty string when create kafka source/sink function
                 Key: FLINK-22969
                 URL: https://issues.apache.org/jira/browse/FLINK-22969
             Project: Flink
          Issue Type: Bug
            Reporter: Shengkai Fang


Add test in UpsertKafkaTableITCase


{code:java}
 @Test
    public void testSourceSinkWithKeyAndPartialValue() throws Exception {
        // we always use a different topic name for each parameterized topic,
        // in order to make sure the topic can be created.
        final String topic = "key_partial_value_topic_" + format;
        createTestTopic(topic, 1, 1); // use single partition to guarantee orders in tests

        // ---------- Produce an event time stream into Kafka -------------------
        String bootstraps = standardProps.getProperty("bootstrap.servers");

        // k_user_id and user_id have different data types to verify the correct mapping,
        // fields are reordered on purpose
        final String createTable =
                String.format(
                        "CREATE TABLE upsert_kafka (\n"
                                + "  `k_user_id` BIGINT,\n"
                                + "  `name` STRING,\n"
                                + "  `timestamp` TIMESTAMP(3) METADATA,\n"
                                + "  `k_event_id` BIGINT,\n"
                                + "  `user_id` INT,\n"
                                + "  `payload` STRING,\n"
                                + "  PRIMARY KEY (k_event_id, k_user_id) NOT ENFORCED"
                                + ") WITH (\n"
                                + "  'connector' = 'upsert-kafka',\n"
                                + "  'topic' = '%s',\n"
                                + "  'properties.bootstrap.servers' = '%s',\n"
                                + "  'key.format' = '%s',\n"
                                + "  'key.fields-prefix' = 'k_',\n"
                                + "  'value.format' = '%s',\n"
                                + "  'value.fields-include' = 'EXCEPT_KEY'\n"
                                + ")",
                        "", bootstraps, format, format);

        tEnv.executeSql(createTable);

        String initialValues =
                "INSERT INTO upsert_kafka\n"
                        + "VALUES\n"
                        + " (1, 'name 1', TIMESTAMP '2020-03-08 13:12:11.123', 100, 41, 'payload 1'),\n"
                        + " (2, 'name 2', TIMESTAMP '2020-03-09 13:12:11.123', 101, 42, 'payload 2'),\n"
                        + " (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 43, 'payload 3'),\n"
                        + " (2, 'name 2', TIMESTAMP '2020-03-11 13:12:11.123', 101, 42, 'payload')";
        tEnv.executeSql(initialValues).await();

        // ---------- Consume stream from Kafka -------------------

        final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM upsert_kafka"), 5);

        final List<Row> expected =
                Arrays.asList(
                        changelogRow(
                                "+I",
                                1L,
                                "name 1",
                                LocalDateTime.parse("2020-03-08T13:12:11.123"),
                                100L,
                                41,
                                "payload 1"),
                        changelogRow(
                                "+I",
                                2L,
                                "name 2",
                                LocalDateTime.parse("2020-03-09T13:12:11.123"),
                                101L,
                                42,
                                "payload 2"),
                        changelogRow(
                                "+I",
                                3L,
                                "name 3",
                                LocalDateTime.parse("2020-03-10T13:12:11.123"),
                                102L,
                                43,
                                "payload 3"),
                        changelogRow(
                                "-U",
                                2L,
                                "name 2",
                                LocalDateTime.parse("2020-03-09T13:12:11.123"),
                                101L,
                                42,
                                "payload 2"),
                        changelogRow(
                                "+U",
                                2L,
                                "name 2",
                                LocalDateTime.parse("2020-03-11T13:12:11.123"),
                                101L,
                                42,
                                "payload"));

        assertThat(result, deepEqualTo(expected, true));

        // ------------- cleanup -------------------

        deleteTestTopic(topic);
    }
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)