[DISCUSS] Introduce partitioning strategies to Table/SQL

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Introduce partitioning strategies to Table/SQL

Jingsong Li
Hi all,

## Motivation

FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
extent that let us support Hive's partitioning.
But this partition definition is completely specific to Hive/File
systems, with the continuous development of the system, there are new
requirements:

- FLIP-107 [2] requirements: A common requirement is to create a custom
partitioning of the data. We should have a way to specify/compute target
partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the
only way to control partitioning.

- Apache Iceberg partitioning [3] requirements: Iceberg produces partition
values by taking a column value and optionally transforming it. Iceberg is
responsible for converting event_time into event_date, and keeps track of
the relationship.

So I think it is better to introduce partitioning strategies to Flink,
the partitioning strategies is similar to partitioning in traditional
database like Oracle [4].

## Proposed Partitioning DDL

Hash Partitioning Tables:

CREATE TABLE kafka_table (
  id STRING,
  name STRING,
  date: DATE ... )
PARTITIONED BY (HASH(id, name))

Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
  name STRING,
  date: DATE ... )
PARTITIONED BY (date)

(Can we remove the brackets when there is only a single layer partition? =>
"PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )

Composite Partitioning Tables:

CREATE TABLE fs_table (
  name STRING,
  date: Date
   ... )
PARTITIONED BY (year(date), month(date), day(date))

Composite Explicit Partitioning Tables (Introduced in FLIP-63):

CREATE TABLE fs_table (
  name STRING,
  date: Date,
  y: STRING,'
  m: STRING,
  d: STRING,
   ... )
PARTITIONED BY (y, m, d)

## Rejected Alternatives

Composite Partitioning Tables DDL like Oracle:

CREATE TABLE invoices (
  invoice_no    NUMBER NOT NULL,
  invoice_date  DATE   NOT NULL,
  comments      VARCHAR2(500))
PARTITION BY RANGE (invoice_date)
SUBPARTITION BY HASH (invoice_no)
SUBPARTITIONS 8 (
  PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
'DD/MM/YYYY')),
  PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
'DD/MM/YYYY')),
  PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
'DD/MM/YYYY')),
  PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
'DD/MM/YYYY'));

- First, Multi level partitioning is a common thing in big data systems.
- Second, the syntax of "SUBPARTITIONS" is not only more complex, but also
completely different from big data systems such as hive. Big data systems
need to specify less partition information than traditional ones, so it is
more natural to write all partitions in one bracket.

## Other Interface changes

It can be imagined that this change will involve many Catalog / Table
related interfaces, and it is necessary to replace the previous
`List<String> partitionKeys` with `partitioning strategies`.

What do you think?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
[3]http://iceberg.apache.org/partitioning/
[4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes

Best,
Jingsong
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

Konstantin Knauf-4
Hi Jingsong,

I would like to understand this FLIP (?) a bit better, but I am missing
some background, I believe. So, some basic questions:

1) Does the PARTITION BY clause only have an effect for sink tables
defining how data should be partitioning the sink system or does it also
make a difference for source tables? My understanding is that it also makes
a difference for source tables (e.g. if the source system
supports partition pruning). I suppose, for source tables Flink does not
check/enforce this, but trusts that the partitioning information is
correct?!

2) I suppose it is up to the connector implementation whether/how to
interpret the partition information. How will this work?

3) For Kafka, I suppose, the most common partitioning strategy is by key.
FLIP-107 contains a proposal on how to define the key (which fields of the
schema should become part of the key) when writing to Kafka via Flink SQL.
How does this relate to the PARTITION BY clause?

Thanks,

Konstantin



On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[hidden email]> wrote:

> Hi all,
>
> ## Motivation
>
> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
> extent that let us support Hive's partitioning.
> But this partition definition is completely specific to Hive/File
> systems, with the continuous development of the system, there are new
> requirements:
>
> - FLIP-107 [2] requirements: A common requirement is to create a custom
> partitioning of the data. We should have a way to specify/compute target
> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be the
> only way to control partitioning.
>
> - Apache Iceberg partitioning [3] requirements: Iceberg produces partition
> values by taking a column value and optionally transforming it. Iceberg is
> responsible for converting event_time into event_date, and keeps track of
> the relationship.
>
> So I think it is better to introduce partitioning strategies to Flink,
> the partitioning strategies is similar to partitioning in traditional
> database like Oracle [4].
>
> ## Proposed Partitioning DDL
>
> Hash Partitioning Tables:
>
> CREATE TABLE kafka_table (
>   id STRING,
>   name STRING,
>   date: DATE ... )
> PARTITIONED BY (HASH(id, name))
>
> Explicit Partitioning Tables (Introduced in FLIP-63):
>
> CREATE TABLE fs_table (
>   name STRING,
>   date: DATE ... )
> PARTITIONED BY (date)
>
> (Can we remove the brackets when there is only a single layer partition? =>
> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
>
> Composite Partitioning Tables:
>
> CREATE TABLE fs_table (
>   name STRING,
>   date: Date
>    ... )
> PARTITIONED BY (year(date), month(date), day(date))
>
> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
>
> CREATE TABLE fs_table (
>   name STRING,
>   date: Date,
>   y: STRING,'
>   m: STRING,
>   d: STRING,
>    ... )
> PARTITIONED BY (y, m, d)
>
> ## Rejected Alternatives
>
> Composite Partitioning Tables DDL like Oracle:
>
> CREATE TABLE invoices (
>   invoice_no    NUMBER NOT NULL,
>   invoice_date  DATE   NOT NULL,
>   comments      VARCHAR2(500))
> PARTITION BY RANGE (invoice_date)
> SUBPARTITION BY HASH (invoice_no)
> SUBPARTITIONS 8 (
>   PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> 'DD/MM/YYYY')),
>   PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> 'DD/MM/YYYY')),
>   PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> 'DD/MM/YYYY')),
>   PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> 'DD/MM/YYYY'));
>
> - First, Multi level partitioning is a common thing in big data systems.
> - Second, the syntax of "SUBPARTITIONS" is not only more complex, but also
> completely different from big data systems such as hive. Big data systems
> need to specify less partition information than traditional ones, so it is
> more natural to write all partitions in one bracket.
>
> ## Other Interface changes
>
> It can be imagined that this change will involve many Catalog / Table
> related interfaces, and it is necessary to replace the previous
> `List<String> partitionKeys` with `partitioning strategies`.
>
> What do you think?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> [3]http://iceberg.apache.org/partitioning/
> [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
>
> Best,
> Jingsong
>


--

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

Timo Walther-2
Hi Jingsong,

I haven't looked at your proposal but I think it make sense to have a
separate FLIP for the parititioning topic. I'm currently working on an
update to FLIP-107 and would suggest to remove the paritioning topic
there. FLIP-107 will only focus on accessing metadata and expressing
key/value formats.

What do you think?

Regards,
Timo

On 01.09.20 07:39, Jingsong Li wrote:

> Thanks Konstantin and Benchao for your response.
>
> If we need to push forward the implementation, it should be a FLIP.
>
> My original intention was to unify the partition definitions for batches
> and streams:
>
> - What is "PARTITION" on a table? Partitions define the physical storage
> form of a table. Different partitions should be stored in different places.
> There should be good isolation between them. Therefore, in the connector
> dimension, we can operate on a partition separately.
> - For the Kafka table, the partition is a finite integer value, depending
> on how many partitions Kafka has.
> - For the Filesystem table, the partition is a directory structure, which
> may be a multi-level structure. And it can be any type, because any type
> can be converted to a character string.
>
> So, how do we generate a partition value? It can be directly mapped to a
> field (identity). In addition, the partition value can also be generated by
> a function (Transform), this is what I want to discuss with you.
>
> ## To Konstantin:
>
>> 1) Does the PARTITION BY clause only have an effect for sink tables
> defining how data should be partitioning the sink system or does it also
> make a difference for source tables? My understanding is that it also makes
> a difference for source tables (e.g. if the source system supports
> partition pruning). I suppose, for source tables Flink does not
> check/enforce this, but trusts that the partitioning information is
> correct?!
>
> Yes, also works for source, according to my understanding, partition
> pruning is actually a kind of filtering push-down. The source is
> responsible for reading data that contains some specific partitions.
>
>> 2) I suppose it is up to the connector implementation whether/how to
> interpret the partition information. How will this work?
>
> I think it depends on the interface and implementation. For example, for
> Kafka, for example, partition fields can be defined on DDL, and some
> filtering conditions can be done according to partition fields in query.
> Kafkasource can operate according to these filter conditions.
>
>> 3) For Kafka, I suppose, the most common partitioning strategy is by key.
> FLIP-107 contains a proposal on how to define the key (which fields of the
> schema should become part of the key) when writing to Kafka via Flink SQL.
> How does this relate to the PARTITION BY clause?
>
> How to define a partition should have nothing to do with whether the field
> it refers to is a key. Users can refer to the fields in the schema.
>
> ## To Benchao,
>
> I feel that your words should be talking about ShuffleBy in the calculation
> level? That's a good topic, but this discussion is mainly about partitions
> in connector storage.
>
> Best,
> Jingsong
>
> On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <[hidden email]> wrote:
>
>> Hi Jingsong,
>>
>> Thanks for bringing up this discussion. I like this idea generally.
>> I'd like to add some cases we met in our scenarios.
>>
>> ## Source Partition By
>> There is an use case that users want to do some lookup thing in the UDF,
>> it's very like the dimension table. It's common for them to cache some
>> lookup result
>> in their UDF. If there is no 'partition by' from the source, they maybe
>> need to cache
>> more data in each subtask.
>> Actually, what they need is `keyBy` from the DataStream world.
>> We supported this feature already in our production.
>>
>> ## Sink Partition By
>> Recently we also received some requirements about this feature.
>> Users want to add their custom sink, and need the data shuffled before the
>> sink.
>> They will do some thing for the data of the same partition key.
>>
>> An addition to 'Source Partition By' semantic, actually, it's not enough
>> for current use cases.
>> The more common way to do this is to add partition by semantic in 'view',
>> then
>> users can do the 'keyBy' multiple times in one query.
>>
>> I've no strong options about these features, just add some use cases and
>> would like to hear more options about this.
>>
>>
>> Konstantin Knauf <[hidden email]> 于2020年8月31日周一 下午7:09写道:
>>
>>> Hi Jingsong,
>>>
>>> I would like to understand this FLIP (?) a bit better, but I am missing
>>> some background, I believe. So, some basic questions:
>>>
>>> 1) Does the PARTITION BY clause only have an effect for sink tables
>>> defining how data should be partitioning the sink system or does it also
>>> make a difference for source tables? My understanding is that it also
>> makes
>>> a difference for source tables (e.g. if the source system
>>> supports partition pruning). I suppose, for source tables Flink does not
>>> check/enforce this, but trusts that the partitioning information is
>>> correct?!
>>>
>>> 2) I suppose it is up to the connector implementation whether/how to
>>> interpret the partition information. How will this work?
>>>
>>> 3) For Kafka, I suppose, the most common partitioning strategy is by key.
>>> FLIP-107 contains a proposal on how to define the key (which fields of
>> the
>>> schema should become part of the key) when writing to Kafka via Flink
>> SQL.
>>> How does this relate to the PARTITION BY clause?
>>>
>>> Thanks,
>>>
>>> Konstantin
>>>
>>>
>>>
>>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[hidden email]>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> ## Motivation
>>>>
>>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
>>>> extent that let us support Hive's partitioning.
>>>> But this partition definition is completely specific to Hive/File
>>>> systems, with the continuous development of the system, there are new
>>>> requirements:
>>>>
>>>> - FLIP-107 [2] requirements: A common requirement is to create a custom
>>>> partitioning of the data. We should have a way to specify/compute
>> target
>>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be
>>> the
>>>> only way to control partitioning.
>>>>
>>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces
>>> partition
>>>> values by taking a column value and optionally transforming it. Iceberg
>>> is
>>>> responsible for converting event_time into event_date, and keeps track
>> of
>>>> the relationship.
>>>>
>>>> So I think it is better to introduce partitioning strategies to Flink,
>>>> the partitioning strategies is similar to partitioning in traditional
>>>> database like Oracle [4].
>>>>
>>>> ## Proposed Partitioning DDL
>>>>
>>>> Hash Partitioning Tables:
>>>>
>>>> CREATE TABLE kafka_table (
>>>>    id STRING,
>>>>    name STRING,
>>>>    date: DATE ... )
>>>> PARTITIONED BY (HASH(id, name))
>>>>
>>>> Explicit Partitioning Tables (Introduced in FLIP-63):
>>>>
>>>> CREATE TABLE fs_table (
>>>>    name STRING,
>>>>    date: DATE ... )
>>>> PARTITIONED BY (date)
>>>>
>>>> (Can we remove the brackets when there is only a single layer
>> partition?
>>> =>
>>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
>>>>
>>>> Composite Partitioning Tables:
>>>>
>>>> CREATE TABLE fs_table (
>>>>    name STRING,
>>>>    date: Date
>>>>     ... )
>>>> PARTITIONED BY (year(date), month(date), day(date))
>>>>
>>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
>>>>
>>>> CREATE TABLE fs_table (
>>>>    name STRING,
>>>>    date: Date,
>>>>    y: STRING,'
>>>>    m: STRING,
>>>>    d: STRING,
>>>>     ... )
>>>> PARTITIONED BY (y, m, d)
>>>>
>>>> ## Rejected Alternatives
>>>>
>>>> Composite Partitioning Tables DDL like Oracle:
>>>>
>>>> CREATE TABLE invoices (
>>>>    invoice_no    NUMBER NOT NULL,
>>>>    invoice_date  DATE   NOT NULL,
>>>>    comments      VARCHAR2(500))
>>>> PARTITION BY RANGE (invoice_date)
>>>> SUBPARTITION BY HASH (invoice_no)
>>>> SUBPARTITIONS 8 (
>>>>    PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
>>>> 'DD/MM/YYYY')),
>>>>    PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
>>>> 'DD/MM/YYYY')),
>>>>    PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
>>>> 'DD/MM/YYYY')),
>>>>    PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
>>>> 'DD/MM/YYYY'));
>>>>
>>>> - First, Multi level partitioning is a common thing in big data
>> systems.
>>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex, but
>>> also
>>>> completely different from big data systems such as hive. Big data
>> systems
>>>> need to specify less partition information than traditional ones, so it
>>> is
>>>> more natural to write all partitions in one bracket.
>>>>
>>>> ## Other Interface changes
>>>>
>>>> It can be imagined that this change will involve many Catalog / Table
>>>> related interfaces, and it is necessary to replace the previous
>>>> `List<String> partitionKeys` with `partitioning strategies`.
>>>>
>>>> What do you think?
>>>>
>>>> [1]
>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
>>>> [2]
>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>>>> [3]http://iceberg.apache.org/partitioning/
>>>> [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>
>>>
>>> --
>>>
>>> Konstantin Knauf
>>>
>>> https://twitter.com/snntrable
>>>
>>> https://github.com/knaufk
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

Jingsong Li
Thanks Timo for working on FLIP-107.

Agree, I think it is good.
I'll spend more time to form a FLIP in detail later.

Best,
Jingsong

On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <[hidden email]> wrote:

> Hi Jingsong,
>
> I haven't looked at your proposal but I think it make sense to have a
> separate FLIP for the parititioning topic. I'm currently working on an
> update to FLIP-107 and would suggest to remove the paritioning topic
> there. FLIP-107 will only focus on accessing metadata and expressing
> key/value formats.
>
> What do you think?
>
> Regards,
> Timo
>
> On 01.09.20 07:39, Jingsong Li wrote:
> > Thanks Konstantin and Benchao for your response.
> >
> > If we need to push forward the implementation, it should be a FLIP.
> >
> > My original intention was to unify the partition definitions for batches
> > and streams:
> >
> > - What is "PARTITION" on a table? Partitions define the physical storage
> > form of a table. Different partitions should be stored in different
> places.
> > There should be good isolation between them. Therefore, in the connector
> > dimension, we can operate on a partition separately.
> > - For the Kafka table, the partition is a finite integer value, depending
> > on how many partitions Kafka has.
> > - For the Filesystem table, the partition is a directory structure, which
> > may be a multi-level structure. And it can be any type, because any type
> > can be converted to a character string.
> >
> > So, how do we generate a partition value? It can be directly mapped to a
> > field (identity). In addition, the partition value can also be generated
> by
> > a function (Transform), this is what I want to discuss with you.
> >
> > ## To Konstantin:
> >
> >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > defining how data should be partitioning the sink system or does it also
> > make a difference for source tables? My understanding is that it also
> makes
> > a difference for source tables (e.g. if the source system supports
> > partition pruning). I suppose, for source tables Flink does not
> > check/enforce this, but trusts that the partitioning information is
> > correct?!
> >
> > Yes, also works for source, according to my understanding, partition
> > pruning is actually a kind of filtering push-down. The source is
> > responsible for reading data that contains some specific partitions.
> >
> >> 2) I suppose it is up to the connector implementation whether/how to
> > interpret the partition information. How will this work?
> >
> > I think it depends on the interface and implementation. For example, for
> > Kafka, for example, partition fields can be defined on DDL, and some
> > filtering conditions can be done according to partition fields in query.
> > Kafkasource can operate according to these filter conditions.
> >
> >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> key.
> > FLIP-107 contains a proposal on how to define the key (which fields of
> the
> > schema should become part of the key) when writing to Kafka via Flink
> SQL.
> > How does this relate to the PARTITION BY clause?
> >
> > How to define a partition should have nothing to do with whether the
> field
> > it refers to is a key. Users can refer to the fields in the schema.
> >
> > ## To Benchao,
> >
> > I feel that your words should be talking about ShuffleBy in the
> calculation
> > level? That's a good topic, but this discussion is mainly about
> partitions
> > in connector storage.
> >
> > Best,
> > Jingsong
> >
> > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <[hidden email]> wrote:
> >
> >> Hi Jingsong,
> >>
> >> Thanks for bringing up this discussion. I like this idea generally.
> >> I'd like to add some cases we met in our scenarios.
> >>
> >> ## Source Partition By
> >> There is an use case that users want to do some lookup thing in the UDF,
> >> it's very like the dimension table. It's common for them to cache some
> >> lookup result
> >> in their UDF. If there is no 'partition by' from the source, they maybe
> >> need to cache
> >> more data in each subtask.
> >> Actually, what they need is `keyBy` from the DataStream world.
> >> We supported this feature already in our production.
> >>
> >> ## Sink Partition By
> >> Recently we also received some requirements about this feature.
> >> Users want to add their custom sink, and need the data shuffled before
> the
> >> sink.
> >> They will do some thing for the data of the same partition key.
> >>
> >> An addition to 'Source Partition By' semantic, actually, it's not enough
> >> for current use cases.
> >> The more common way to do this is to add partition by semantic in
> 'view',
> >> then
> >> users can do the 'keyBy' multiple times in one query.
> >>
> >> I've no strong options about these features, just add some use cases and
> >> would like to hear more options about this.
> >>
> >>
> >> Konstantin Knauf <[hidden email]> 于2020年8月31日周一 下午7:09写道:
> >>
> >>> Hi Jingsong,
> >>>
> >>> I would like to understand this FLIP (?) a bit better, but I am missing
> >>> some background, I believe. So, some basic questions:
> >>>
> >>> 1) Does the PARTITION BY clause only have an effect for sink tables
> >>> defining how data should be partitioning the sink system or does it
> also
> >>> make a difference for source tables? My understanding is that it also
> >> makes
> >>> a difference for source tables (e.g. if the source system
> >>> supports partition pruning). I suppose, for source tables Flink does
> not
> >>> check/enforce this, but trusts that the partitioning information is
> >>> correct?!
> >>>
> >>> 2) I suppose it is up to the connector implementation whether/how to
> >>> interpret the partition information. How will this work?
> >>>
> >>> 3) For Kafka, I suppose, the most common partitioning strategy is by
> key.
> >>> FLIP-107 contains a proposal on how to define the key (which fields of
> >> the
> >>> schema should become part of the key) when writing to Kafka via Flink
> >> SQL.
> >>> How does this relate to the PARTITION BY clause?
> >>>
> >>> Thanks,
> >>>
> >>> Konstantin
> >>>
> >>>
> >>>
> >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[hidden email]>
> >>> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> ## Motivation
> >>>>
> >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to an
> >>>> extent that let us support Hive's partitioning.
> >>>> But this partition definition is completely specific to Hive/File
> >>>> systems, with the continuous development of the system, there are new
> >>>> requirements:
> >>>>
> >>>> - FLIP-107 [2] requirements: A common requirement is to create a
> custom
> >>>> partitioning of the data. We should have a way to specify/compute
> >> target
> >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would be
> >>> the
> >>>> only way to control partitioning.
> >>>>
> >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces
> >>> partition
> >>>> values by taking a column value and optionally transforming it.
> Iceberg
> >>> is
> >>>> responsible for converting event_time into event_date, and keeps track
> >> of
> >>>> the relationship.
> >>>>
> >>>> So I think it is better to introduce partitioning strategies to Flink,
> >>>> the partitioning strategies is similar to partitioning in traditional
> >>>> database like Oracle [4].
> >>>>
> >>>> ## Proposed Partitioning DDL
> >>>>
> >>>> Hash Partitioning Tables:
> >>>>
> >>>> CREATE TABLE kafka_table (
> >>>>    id STRING,
> >>>>    name STRING,
> >>>>    date: DATE ... )
> >>>> PARTITIONED BY (HASH(id, name))
> >>>>
> >>>> Explicit Partitioning Tables (Introduced in FLIP-63):
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: DATE ... )
> >>>> PARTITIONED BY (date)
> >>>>
> >>>> (Can we remove the brackets when there is only a single layer
> >> partition?
> >>> =>
> >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
> >>>>
> >>>> Composite Partitioning Tables:
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: Date
> >>>>     ... )
> >>>> PARTITIONED BY (year(date), month(date), day(date))
> >>>>
> >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
> >>>>
> >>>> CREATE TABLE fs_table (
> >>>>    name STRING,
> >>>>    date: Date,
> >>>>    y: STRING,'
> >>>>    m: STRING,
> >>>>    d: STRING,
> >>>>     ... )
> >>>> PARTITIONED BY (y, m, d)
> >>>>
> >>>> ## Rejected Alternatives
> >>>>
> >>>> Composite Partitioning Tables DDL like Oracle:
> >>>>
> >>>> CREATE TABLE invoices (
> >>>>    invoice_no    NUMBER NOT NULL,
> >>>>    invoice_date  DATE   NOT NULL,
> >>>>    comments      VARCHAR2(500))
> >>>> PARTITION BY RANGE (invoice_date)
> >>>> SUBPARTITION BY HASH (invoice_no)
> >>>> SUBPARTITIONS 8 (
> >>>>    PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> >>>> 'DD/MM/YYYY')),
> >>>>    PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> >>>> 'DD/MM/YYYY'));
> >>>>
> >>>> - First, Multi level partitioning is a common thing in big data
> >> systems.
> >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex, but
> >>> also
> >>>> completely different from big data systems such as hive. Big data
> >> systems
> >>>> need to specify less partition information than traditional ones, so
> it
> >>> is
> >>>> more natural to write all partitions in one bracket.
> >>>>
> >>>> ## Other Interface changes
> >>>>
> >>>> It can be imagined that this change will involve many Catalog / Table
> >>>> related interfaces, and it is necessary to replace the previous
> >>>> `List<String> partitionKeys` with `partitioning strategies`.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> [1]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> >>>> [2]
> >>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >>>> [3]http://iceberg.apache.org/partitioning/
> >>>> [4]https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
> >>>>
> >>>> Best,
> >>>> Jingsong
> >>>>
> >>>
> >>>
> >>> --
> >>>
> >>> Konstantin Knauf
> >>>
> >>> https://twitter.com/snntrable
> >>>
> >>> https://github.com/knaufk
> >>>
> >>
> >>
> >> --
> >>
> >> Best,
> >> Benchao Li
> >>
> >
> >
>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Introduce partitioning strategies to Table/SQL

Benchao Li-2
Hi Jingsong,

Thanks for the clarification, and sorry to misunderstand your first
intention.
What I was talking about is indeed another topic, we can leave it to the
future,
and see if there are any other people who have the same scenarios.

Jingsong Li <[hidden email]> 于2020年9月3日周四 上午10:56写道:

> Thanks Timo for working on FLIP-107.
>
> Agree, I think it is good.
> I'll spend more time to form a FLIP in detail later.
>
> Best,
> Jingsong
>
> On Wed, Sep 2, 2020 at 7:12 PM Timo Walther <[hidden email]> wrote:
>
> > Hi Jingsong,
> >
> > I haven't looked at your proposal but I think it make sense to have a
> > separate FLIP for the parititioning topic. I'm currently working on an
> > update to FLIP-107 and would suggest to remove the paritioning topic
> > there. FLIP-107 will only focus on accessing metadata and expressing
> > key/value formats.
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> > On 01.09.20 07:39, Jingsong Li wrote:
> > > Thanks Konstantin and Benchao for your response.
> > >
> > > If we need to push forward the implementation, it should be a FLIP.
> > >
> > > My original intention was to unify the partition definitions for
> batches
> > > and streams:
> > >
> > > - What is "PARTITION" on a table? Partitions define the physical
> storage
> > > form of a table. Different partitions should be stored in different
> > places.
> > > There should be good isolation between them. Therefore, in the
> connector
> > > dimension, we can operate on a partition separately.
> > > - For the Kafka table, the partition is a finite integer value,
> depending
> > > on how many partitions Kafka has.
> > > - For the Filesystem table, the partition is a directory structure,
> which
> > > may be a multi-level structure. And it can be any type, because any
> type
> > > can be converted to a character string.
> > >
> > > So, how do we generate a partition value? It can be directly mapped to
> a
> > > field (identity). In addition, the partition value can also be
> generated
> > by
> > > a function (Transform), this is what I want to discuss with you.
> > >
> > > ## To Konstantin:
> > >
> > >> 1) Does the PARTITION BY clause only have an effect for sink tables
> > > defining how data should be partitioning the sink system or does it
> also
> > > make a difference for source tables? My understanding is that it also
> > makes
> > > a difference for source tables (e.g. if the source system supports
> > > partition pruning). I suppose, for source tables Flink does not
> > > check/enforce this, but trusts that the partitioning information is
> > > correct?!
> > >
> > > Yes, also works for source, according to my understanding, partition
> > > pruning is actually a kind of filtering push-down. The source is
> > > responsible for reading data that contains some specific partitions.
> > >
> > >> 2) I suppose it is up to the connector implementation whether/how to
> > > interpret the partition information. How will this work?
> > >
> > > I think it depends on the interface and implementation. For example,
> for
> > > Kafka, for example, partition fields can be defined on DDL, and some
> > > filtering conditions can be done according to partition fields in
> query.
> > > Kafkasource can operate according to these filter conditions.
> > >
> > >> 3) For Kafka, I suppose, the most common partitioning strategy is by
> > key.
> > > FLIP-107 contains a proposal on how to define the key (which fields of
> > the
> > > schema should become part of the key) when writing to Kafka via Flink
> > SQL.
> > > How does this relate to the PARTITION BY clause?
> > >
> > > How to define a partition should have nothing to do with whether the
> > field
> > > it refers to is a key. Users can refer to the fields in the schema.
> > >
> > > ## To Benchao,
> > >
> > > I feel that your words should be talking about ShuffleBy in the
> > calculation
> > > level? That's a good topic, but this discussion is mainly about
> > partitions
> > > in connector storage.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Sep 1, 2020 at 10:51 AM Benchao Li <[hidden email]>
> wrote:
> > >
> > >> Hi Jingsong,
> > >>
> > >> Thanks for bringing up this discussion. I like this idea generally.
> > >> I'd like to add some cases we met in our scenarios.
> > >>
> > >> ## Source Partition By
> > >> There is an use case that users want to do some lookup thing in the
> UDF,
> > >> it's very like the dimension table. It's common for them to cache some
> > >> lookup result
> > >> in their UDF. If there is no 'partition by' from the source, they
> maybe
> > >> need to cache
> > >> more data in each subtask.
> > >> Actually, what they need is `keyBy` from the DataStream world.
> > >> We supported this feature already in our production.
> > >>
> > >> ## Sink Partition By
> > >> Recently we also received some requirements about this feature.
> > >> Users want to add their custom sink, and need the data shuffled before
> > the
> > >> sink.
> > >> They will do some thing for the data of the same partition key.
> > >>
> > >> An addition to 'Source Partition By' semantic, actually, it's not
> enough
> > >> for current use cases.
> > >> The more common way to do this is to add partition by semantic in
> > 'view',
> > >> then
> > >> users can do the 'keyBy' multiple times in one query.
> > >>
> > >> I've no strong options about these features, just add some use cases
> and
> > >> would like to hear more options about this.
> > >>
> > >>
> > >> Konstantin Knauf <[hidden email]> 于2020年8月31日周一 下午7:09写道:
> > >>
> > >>> Hi Jingsong,
> > >>>
> > >>> I would like to understand this FLIP (?) a bit better, but I am
> missing
> > >>> some background, I believe. So, some basic questions:
> > >>>
> > >>> 1) Does the PARTITION BY clause only have an effect for sink tables
> > >>> defining how data should be partitioning the sink system or does it
> > also
> > >>> make a difference for source tables? My understanding is that it also
> > >> makes
> > >>> a difference for source tables (e.g. if the source system
> > >>> supports partition pruning). I suppose, for source tables Flink does
> > not
> > >>> check/enforce this, but trusts that the partitioning information is
> > >>> correct?!
> > >>>
> > >>> 2) I suppose it is up to the connector implementation whether/how to
> > >>> interpret the partition information. How will this work?
> > >>>
> > >>> 3) For Kafka, I suppose, the most common partitioning strategy is by
> > key.
> > >>> FLIP-107 contains a proposal on how to define the key (which fields
> of
> > >> the
> > >>> schema should become part of the key) when writing to Kafka via Flink
> > >> SQL.
> > >>> How does this relate to the PARTITION BY clause?
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Konstantin
> > >>>
> > >>>
> > >>>
> > >>> On Mon, Aug 24, 2020 at 10:54 AM Jingsong Li <[hidden email]
> >
> > >>> wrote:
> > >>>
> > >>>> Hi all,
> > >>>>
> > >>>> ## Motivation
> > >>>>
> > >>>> FLIP-63 [1] introduced initial support for PARTITIONED BY clause to
> an
> > >>>> extent that let us support Hive's partitioning.
> > >>>> But this partition definition is completely specific to Hive/File
> > >>>> systems, with the continuous development of the system, there are
> new
> > >>>> requirements:
> > >>>>
> > >>>> - FLIP-107 [2] requirements: A common requirement is to create a
> > custom
> > >>>> partitioning of the data. We should have a way to specify/compute
> > >> target
> > >>>> partition/shard for Kinesis/Pravega/Pulsar. In those cases it would
> be
> > >>> the
> > >>>> only way to control partitioning.
> > >>>>
> > >>>> - Apache Iceberg partitioning [3] requirements: Iceberg produces
> > >>> partition
> > >>>> values by taking a column value and optionally transforming it.
> > Iceberg
> > >>> is
> > >>>> responsible for converting event_time into event_date, and keeps
> track
> > >> of
> > >>>> the relationship.
> > >>>>
> > >>>> So I think it is better to introduce partitioning strategies to
> Flink,
> > >>>> the partitioning strategies is similar to partitioning in
> traditional
> > >>>> database like Oracle [4].
> > >>>>
> > >>>> ## Proposed Partitioning DDL
> > >>>>
> > >>>> Hash Partitioning Tables:
> > >>>>
> > >>>> CREATE TABLE kafka_table (
> > >>>>    id STRING,
> > >>>>    name STRING,
> > >>>>    date: DATE ... )
> > >>>> PARTITIONED BY (HASH(id, name))
> > >>>>
> > >>>> Explicit Partitioning Tables (Introduced in FLIP-63):
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: DATE ... )
> > >>>> PARTITIONED BY (date)
> > >>>>
> > >>>> (Can we remove the brackets when there is only a single layer
> > >> partition?
> > >>> =>
> > >>>> "PARTITIONED BY HASH(id, name)" and "PARTITIONED BY date" )
> > >>>>
> > >>>> Composite Partitioning Tables:
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: Date
> > >>>>     ... )
> > >>>> PARTITIONED BY (year(date), month(date), day(date))
> > >>>>
> > >>>> Composite Explicit Partitioning Tables (Introduced in FLIP-63):
> > >>>>
> > >>>> CREATE TABLE fs_table (
> > >>>>    name STRING,
> > >>>>    date: Date,
> > >>>>    y: STRING,'
> > >>>>    m: STRING,
> > >>>>    d: STRING,
> > >>>>     ... )
> > >>>> PARTITIONED BY (y, m, d)
> > >>>>
> > >>>> ## Rejected Alternatives
> > >>>>
> > >>>> Composite Partitioning Tables DDL like Oracle:
> > >>>>
> > >>>> CREATE TABLE invoices (
> > >>>>    invoice_no    NUMBER NOT NULL,
> > >>>>    invoice_date  DATE   NOT NULL,
> > >>>>    comments      VARCHAR2(500))
> > >>>> PARTITION BY RANGE (invoice_date)
> > >>>> SUBPARTITION BY HASH (invoice_no)
> > >>>> SUBPARTITIONS 8 (
> > >>>>    PARTITION invoices_q1 VALUES LESS THAN (TO_DATE('01/04/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q2 VALUES LESS THAN (TO_DATE('01/07/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q3 VALUES LESS THAN (TO_DATE('01/09/2001',
> > >>>> 'DD/MM/YYYY')),
> > >>>>    PARTITION invoices_q4 VALUES LESS THAN (TO_DATE('01/01/2002',
> > >>>> 'DD/MM/YYYY'));
> > >>>>
> > >>>> - First, Multi level partitioning is a common thing in big data
> > >> systems.
> > >>>> - Second, the syntax of "SUBPARTITIONS" is not only more complex,
> but
> > >>> also
> > >>>> completely different from big data systems such as hive. Big data
> > >> systems
> > >>>> need to specify less partition information than traditional ones, so
> > it
> > >>> is
> > >>>> more natural to write all partitions in one bracket.
> > >>>>
> > >>>> ## Other Interface changes
> > >>>>
> > >>>> It can be imagined that this change will involve many Catalog /
> Table
> > >>>> related interfaces, and it is necessary to replace the previous
> > >>>> `List<String> partitionKeys` with `partitioning strategies`.
> > >>>>
> > >>>> What do you think?
> > >>>>
> > >>>> [1]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> > >>>> [2]
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> > >>>> [3]http://iceberg.apache.org/partitioning/
> > >>>> [4]
> https://oracle-base.com/articles/8i/partitioned-tables-and-indexes
> > >>>>
> > >>>> Best,
> > >>>> Jingsong
> > >>>>
> > >>>
> > >>>
> > >>> --
> > >>>
> > >>> Konstantin Knauf
> > >>>
> > >>> https://twitter.com/snntrable
> > >>>
> > >>> https://github.com/knaufk
> > >>>
> > >>
> > >>
> > >> --
> > >>
> > >> Best,
> > >> Benchao Li
> > >>
> > >
> > >
> >
> >
>
> --
> Best, Jingsong Lee
>


--

Best,
Benchao Li