[DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

dwysakowicz

Hi devs,

When working on improving the Table API/SQL connectors we faced a few shortcomings of the DeserializationSchema and SerializationSchema interfaces. Similar features were also mentioned by other users in the past. The shortcomings I would like to address with the FLIP include:

  • A way to initialize the schema
    • establish external connections
    • generate code on startup
    • no need for lazy initialization

One important aspect I would like to hear your opinion on is how to support the Collector interface in Kafka source. Of course if we agree to add the Collector to the DeserializationSchema.

The FLIP can be found here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode

Looking forward to your feedback.

Best,

Dawid


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Timo Walther-2
Hi Dawid,

thanks for this FLIP. This solves a lot of issues with the current
design for both the Flink contributors and users. +1 for this.

Some minor suggestions from my side:
- How about finding something shorter for `InitializationContext`? Maybe
just `OpenContext`?
- While introducing default methods for existing interfaces, shall we
also create contexts for those methods? I see the following method in
your FLIP and wonder if we can reduce the number of parameters while
introducing a new method:

deserialize(
             byte[] recordValue,
             String partitionKey,
             String seqNum,
             long approxArrivalTimestamp,
             String stream,
             String shardId,
             Collector<T> out)

to:

deserialize(
             byte[] recordValue,
             Context c,
             Collector<T> out)

What do you think?

Regards,
Timo



On 06.04.20 11:08, Dawid Wysakowicz wrote:

> Hi devs,
>
> When working on improving the Table API/SQL connectors we faced a few
> shortcomings of the DeserializationSchema and SerializationSchema
> interfaces. Similar features were also mentioned by other users in the
> past. The shortcomings I would like to address with the FLIP include:
>
>   * Emitting 0 to m records from the deserialization schema with per
>     partition watermarks
>       o https://github.com/apache/flink/pull/3314#issuecomment-376237266
>       o differentiate null value from no value
>       o support for Debezium CDC format
>         (https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL)
>
>   * A way to initialize the schema
>       o establish external connections
>       o generate code on startup
>       o no need for lazy initialization
>
>   * Access to metrics
>     [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329]
>
> One important aspect I would like to hear your opinion on is how to
> support the Collector interface in Kafka source. Of course if we agree
> to add the Collector to the DeserializationSchema.
>
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
>
> Looking forward to your feedback.
>
> Best,
>
> Dawid
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Seth Wiesman-4
I would be in favor of buffering data outside of the checkpoint lock. In my
experience, serialization is always the biggest performance killer in user
code and I have a hard time believing in practice that anyone is going to
buffer so many records that is causes real memory concerns.

To add to Timo's point,

Statefun actually did that on its Kinesis ser/de interfaces[1,2].

Seth

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java


On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <[hidden email]> wrote:

> Hi Dawid,
>
> thanks for this FLIP. This solves a lot of issues with the current
> design for both the Flink contributors and users. +1 for this.
>
> Some minor suggestions from my side:
> - How about finding something shorter for `InitializationContext`? Maybe
> just `OpenContext`?
> - While introducing default methods for existing interfaces, shall we
> also create contexts for those methods? I see the following method in
> your FLIP and wonder if we can reduce the number of parameters while
> introducing a new method:
>
> deserialize(
>              byte[] recordValue,
>              String partitionKey,
>              String seqNum,
>              long approxArrivalTimestamp,
>              String stream,
>              String shardId,
>              Collector<T> out)
>
> to:
>
> deserialize(
>              byte[] recordValue,
>              Context c,
>              Collector<T> out)
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > Hi devs,
> >
> > When working on improving the Table API/SQL connectors we faced a few
> > shortcomings of the DeserializationSchema and SerializationSchema
> > interfaces. Similar features were also mentioned by other users in the
> > past. The shortcomings I would like to address with the FLIP include:
> >
> >   * Emitting 0 to m records from the deserialization schema with per
> >     partition watermarks
> >       o https://github.com/apache/flink/pull/3314#issuecomment-376237266
> >       o differentiate null value from no value
> >       o support for Debezium CDC format
> >         (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> )
> >
> >   * A way to initialize the schema
> >       o establish external connections
> >       o generate code on startup
> >       o no need for lazy initialization
> >
> >   * Access to metrics
> >     [
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> ]
> >
> > One important aspect I would like to hear your opinion on is how to
> > support the Collector interface in Kafka source. Of course if we agree
> > to add the Collector to the DeserializationSchema.
> >
> > The FLIP can be found here:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> >
> > Looking forward to your feedback.
> >
> > Best,
> >
> > Dawid
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Jark Wu-2
Hi Dawid,

Thanks for driving this. This is a blocker to support Debezium CDC format
(FLIP-105). So big +1 from my side.

Regarding to emitting multiple records and checkpointing, I'm also in favor
of option#1: buffer all the records outside of the checkpoint lock.
I think most of the use cases will not buffer larger data than
it's deserialized byte[].

I have a minor suggestion on DeserializationSchema: could we have a default
implementation (maybe throw exception) for `T deserialize(byte[] message)`?
I think this will not break compatibility, and users don't have to
implement this deprecated interface if he/she wants to use the new
collector interface.
I think SinkFunction also did this in the same way: introduce a new invoke
method with Context parameter, and give the old invoke method an
empty implemention.

Best,
Jark

On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <[hidden email]> wrote:

> I would be in favor of buffering data outside of the checkpoint lock. In my
> experience, serialization is always the biggest performance killer in user
> code and I have a hard time believing in practice that anyone is going to
> buffer so many records that is causes real memory concerns.
>
> To add to Timo's point,
>
> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>
> Seth
>
> [1]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> [2]
>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>
>
> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <[hidden email]> wrote:
>
> > Hi Dawid,
> >
> > thanks for this FLIP. This solves a lot of issues with the current
> > design for both the Flink contributors and users. +1 for this.
> >
> > Some minor suggestions from my side:
> > - How about finding something shorter for `InitializationContext`? Maybe
> > just `OpenContext`?
> > - While introducing default methods for existing interfaces, shall we
> > also create contexts for those methods? I see the following method in
> > your FLIP and wonder if we can reduce the number of parameters while
> > introducing a new method:
> >
> > deserialize(
> >              byte[] recordValue,
> >              String partitionKey,
> >              String seqNum,
> >              long approxArrivalTimestamp,
> >              String stream,
> >              String shardId,
> >              Collector<T> out)
> >
> > to:
> >
> > deserialize(
> >              byte[] recordValue,
> >              Context c,
> >              Collector<T> out)
> >
> > What do you think?
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > > Hi devs,
> > >
> > > When working on improving the Table API/SQL connectors we faced a few
> > > shortcomings of the DeserializationSchema and SerializationSchema
> > > interfaces. Similar features were also mentioned by other users in the
> > > past. The shortcomings I would like to address with the FLIP include:
> > >
> > >   * Emitting 0 to m records from the deserialization schema with per
> > >     partition watermarks
> > >       o
> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >       o differentiate null value from no value
> > >       o support for Debezium CDC format
> > >         (
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > )
> > >
> > >   * A way to initialize the schema
> > >       o establish external connections
> > >       o generate code on startup
> > >       o no need for lazy initialization
> > >
> > >   * Access to metrics
> > >     [
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > ]
> > >
> > > One important aspect I would like to hear your opinion on is how to
> > > support the Collector interface in Kafka source. Of course if we agree
> > > to add the Collector to the DeserializationSchema.
> > >
> > > The FLIP can be found here:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> > >
> > > Looking forward to your feedback.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

dwysakowicz
Hi all,

@Timo I'm fine with OpenContext.

@Timo @Seth Sure we can combine all the parameters in a single object.
Will update the FLIP

@Jark I was aware of the implementation of SinkFunction, but it was a
conscious choice to not do it that way.

Personally I am against giving a default implementation to both the new
and old methods. This results in an interface that by default does
nothing or notifies the user only in the runtime, that he/she has not
implemented a method of the interface, which does not sound like a good
practice to me. Moreover I believe the method without a Collector will
still be the preferred method by many users. Plus it communicates
explicitly what is the minimal functionality required by the interface.
Nevertheless I am happy to hear other opinions.

@all I also prefer the buffering approach. Let's wait a day or two more
to see if others think differently.

Best,

Dawid

On 07/04/2020 06:11, Jark Wu wrote:

> Hi Dawid,
>
> Thanks for driving this. This is a blocker to support Debezium CDC format
> (FLIP-105). So big +1 from my side.
>
> Regarding to emitting multiple records and checkpointing, I'm also in favor
> of option#1: buffer all the records outside of the checkpoint lock.
> I think most of the use cases will not buffer larger data than
> it's deserialized byte[].
>
> I have a minor suggestion on DeserializationSchema: could we have a default
> implementation (maybe throw exception) for `T deserialize(byte[] message)`?
> I think this will not break compatibility, and users don't have to
> implement this deprecated interface if he/she wants to use the new
> collector interface.
> I think SinkFunction also did this in the same way: introduce a new invoke
> method with Context parameter, and give the old invoke method an
> empty implemention.
>
> Best,
> Jark
>
> On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <[hidden email]> wrote:
>
>> I would be in favor of buffering data outside of the checkpoint lock. In my
>> experience, serialization is always the biggest performance killer in user
>> code and I have a hard time believing in practice that anyone is going to
>> buffer so many records that is causes real memory concerns.
>>
>> To add to Timo's point,
>>
>> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
>>
>> Seth
>>
>> [1]
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
>> [2]
>>
>> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
>>
>>
>> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <[hidden email]> wrote:
>>
>>> Hi Dawid,
>>>
>>> thanks for this FLIP. This solves a lot of issues with the current
>>> design for both the Flink contributors and users. +1 for this.
>>>
>>> Some minor suggestions from my side:
>>> - How about finding something shorter for `InitializationContext`? Maybe
>>> just `OpenContext`?
>>> - While introducing default methods for existing interfaces, shall we
>>> also create contexts for those methods? I see the following method in
>>> your FLIP and wonder if we can reduce the number of parameters while
>>> introducing a new method:
>>>
>>> deserialize(
>>>              byte[] recordValue,
>>>              String partitionKey,
>>>              String seqNum,
>>>              long approxArrivalTimestamp,
>>>              String stream,
>>>              String shardId,
>>>              Collector<T> out)
>>>
>>> to:
>>>
>>> deserialize(
>>>              byte[] recordValue,
>>>              Context c,
>>>              Collector<T> out)
>>>
>>> What do you think?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>>
>>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
>>>> Hi devs,
>>>>
>>>> When working on improving the Table API/SQL connectors we faced a few
>>>> shortcomings of the DeserializationSchema and SerializationSchema
>>>> interfaces. Similar features were also mentioned by other users in the
>>>> past. The shortcomings I would like to address with the FLIP include:
>>>>
>>>>   * Emitting 0 to m records from the deserialization schema with per
>>>>     partition watermarks
>>>>       o
>> https://github.com/apache/flink/pull/3314#issuecomment-376237266
>>>>       o differentiate null value from no value
>>>>       o support for Debezium CDC format
>>>>         (
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
>>> )
>>>>   * A way to initialize the schema
>>>>       o establish external connections
>>>>       o generate code on startup
>>>>       o no need for lazy initialization
>>>>
>>>>   * Access to metrics
>>>>     [
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
>>> ]
>>>> One important aspect I would like to hear your opinion on is how to
>>>> support the Collector interface in Kafka source. Of course if we agree
>>>> to add the Collector to the DeserializationSchema.
>>>>
>>>> The FLIP can be found here:
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
>>>> Looking forward to your feedback.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>>
>>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Jark Wu-2
Thanks for the explanation. Sounds good to me.

Best,
Jark

On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> @Timo I'm fine with OpenContext.
>
> @Timo @Seth Sure we can combine all the parameters in a single object.
> Will update the FLIP
>
> @Jark I was aware of the implementation of SinkFunction, but it was a
> conscious choice to not do it that way.
>
> Personally I am against giving a default implementation to both the new
> and old methods. This results in an interface that by default does
> nothing or notifies the user only in the runtime, that he/she has not
> implemented a method of the interface, which does not sound like a good
> practice to me. Moreover I believe the method without a Collector will
> still be the preferred method by many users. Plus it communicates
> explicitly what is the minimal functionality required by the interface.
> Nevertheless I am happy to hear other opinions.
>
> @all I also prefer the buffering approach. Let's wait a day or two more
> to see if others think differently.
>
> Best,
>
> Dawid
>
> On 07/04/2020 06:11, Jark Wu wrote:
> > Hi Dawid,
> >
> > Thanks for driving this. This is a blocker to support Debezium CDC format
> > (FLIP-105). So big +1 from my side.
> >
> > Regarding to emitting multiple records and checkpointing, I'm also in
> favor
> > of option#1: buffer all the records outside of the checkpoint lock.
> > I think most of the use cases will not buffer larger data than
> > it's deserialized byte[].
> >
> > I have a minor suggestion on DeserializationSchema: could we have a
> default
> > implementation (maybe throw exception) for `T deserialize(byte[]
> message)`?
> > I think this will not break compatibility, and users don't have to
> > implement this deprecated interface if he/she wants to use the new
> > collector interface.
> > I think SinkFunction also did this in the same way: introduce a new
> invoke
> > method with Context parameter, and give the old invoke method an
> > empty implemention.
> >
> > Best,
> > Jark
> >
> > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <[hidden email]> wrote:
> >
> >> I would be in favor of buffering data outside of the checkpoint lock.
> In my
> >> experience, serialization is always the biggest performance killer in
> user
> >> code and I have a hard time believing in practice that anyone is going
> to
> >> buffer so many records that is causes real memory concerns.
> >>
> >> To add to Timo's point,
> >>
> >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> >>
> >> Seth
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> >> [2]
> >>
> >>
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> >>
> >>
> >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <[hidden email]> wrote:
> >>
> >>> Hi Dawid,
> >>>
> >>> thanks for this FLIP. This solves a lot of issues with the current
> >>> design for both the Flink contributors and users. +1 for this.
> >>>
> >>> Some minor suggestions from my side:
> >>> - How about finding something shorter for `InitializationContext`?
> Maybe
> >>> just `OpenContext`?
> >>> - While introducing default methods for existing interfaces, shall we
> >>> also create contexts for those methods? I see the following method in
> >>> your FLIP and wonder if we can reduce the number of parameters while
> >>> introducing a new method:
> >>>
> >>> deserialize(
> >>>              byte[] recordValue,
> >>>              String partitionKey,
> >>>              String seqNum,
> >>>              long approxArrivalTimestamp,
> >>>              String stream,
> >>>              String shardId,
> >>>              Collector<T> out)
> >>>
> >>> to:
> >>>
> >>> deserialize(
> >>>              byte[] recordValue,
> >>>              Context c,
> >>>              Collector<T> out)
> >>>
> >>> What do you think?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>>
> >>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> >>>> Hi devs,
> >>>>
> >>>> When working on improving the Table API/SQL connectors we faced a few
> >>>> shortcomings of the DeserializationSchema and SerializationSchema
> >>>> interfaces. Similar features were also mentioned by other users in the
> >>>> past. The shortcomings I would like to address with the FLIP include:
> >>>>
> >>>>   * Emitting 0 to m records from the deserialization schema with per
> >>>>     partition watermarks
> >>>>       o
> >> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> >>>>       o differentiate null value from no value
> >>>>       o support for Debezium CDC format
> >>>>         (
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> >>> )
> >>>>   * A way to initialize the schema
> >>>>       o establish external connections
> >>>>       o generate code on startup
> >>>>       o no need for lazy initialization
> >>>>
> >>>>   * Access to metrics
> >>>>     [
> >>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> >>> ]
> >>>> One important aspect I would like to hear your opinion on is how to
> >>>> support the Collector interface in Kafka source. Of course if we agree
> >>>> to add the Collector to the DeserializationSchema.
> >>>>
> >>>> The FLIP can be found here:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> >>>> Looking forward to your feedback.
> >>>>
> >>>> Best,
> >>>>
> >>>> Dawid
> >>>>
> >>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

wenlong.lwl
Hi Dawid, thanks for driving this. big +1 for this flip. Currently we have
to implement an internal flat map in connectors, which is very inconvenient
to support customized parser in DDL.

Here is my minor suggestions:
1. Can we support serialize/deserialize from/to a message with a specified
type instead of byte[], or we would need a lot of other parser interfaces
similar to KakfaDeserializationSchema/KakfaSerializationSchema when we
would like to support other kinds of sources, such as RocketMQ, RabbitMQ,
etc.
2. How can we support serialize multi output records to a compacted
messaged? may be we need a method such as `void flush(Collector output)?`
to support flush buffered record when checkpointing?

On Tue, 7 Apr 2020 at 14:57, Jark Wu <[hidden email]> wrote:

> Thanks for the explanation. Sounds good to me.
>
> Best,
> Jark
>
> On Tue, 7 Apr 2020 at 14:45, Dawid Wysakowicz <[hidden email]>
> wrote:
>
> > Hi all,
> >
> > @Timo I'm fine with OpenContext.
> >
> > @Timo @Seth Sure we can combine all the parameters in a single object.
> > Will update the FLIP
> >
> > @Jark I was aware of the implementation of SinkFunction, but it was a
> > conscious choice to not do it that way.
> >
> > Personally I am against giving a default implementation to both the new
> > and old methods. This results in an interface that by default does
> > nothing or notifies the user only in the runtime, that he/she has not
> > implemented a method of the interface, which does not sound like a good
> > practice to me. Moreover I believe the method without a Collector will
> > still be the preferred method by many users. Plus it communicates
> > explicitly what is the minimal functionality required by the interface.
> > Nevertheless I am happy to hear other opinions.
> >
> > @all I also prefer the buffering approach. Let's wait a day or two more
> > to see if others think differently.
> >
> > Best,
> >
> > Dawid
> >
> > On 07/04/2020 06:11, Jark Wu wrote:
> > > Hi Dawid,
> > >
> > > Thanks for driving this. This is a blocker to support Debezium CDC
> format
> > > (FLIP-105). So big +1 from my side.
> > >
> > > Regarding to emitting multiple records and checkpointing, I'm also in
> > favor
> > > of option#1: buffer all the records outside of the checkpoint lock.
> > > I think most of the use cases will not buffer larger data than
> > > it's deserialized byte[].
> > >
> > > I have a minor suggestion on DeserializationSchema: could we have a
> > default
> > > implementation (maybe throw exception) for `T deserialize(byte[]
> > message)`?
> > > I think this will not break compatibility, and users don't have to
> > > implement this deprecated interface if he/she wants to use the new
> > > collector interface.
> > > I think SinkFunction also did this in the same way: introduce a new
> > invoke
> > > method with Context parameter, and give the old invoke method an
> > > empty implemention.
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 6 Apr 2020 at 23:51, Seth Wiesman <[hidden email]> wrote:
> > >
> > >> I would be in favor of buffering data outside of the checkpoint lock.
> > In my
> > >> experience, serialization is always the biggest performance killer in
> > user
> > >> code and I have a hard time believing in practice that anyone is going
> > to
> > >> buffer so many records that is causes real memory concerns.
> > >>
> > >> To add to Timo's point,
> > >>
> > >> Statefun actually did that on its Kinesis ser/de interfaces[1,2].
> > >>
> > >> Seth
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/ingress/KinesisIngressDeserializer.java
> > >> [2]
> > >>
> > >>
> >
> https://github.com/apache/flink-statefun/blob/master/statefun-kinesis-io/src/main/java/org/apache/flink/statefun/sdk/kinesis/egress/KinesisEgressSerializer.java
> > >>
> > >>
> > >> On Mon, Apr 6, 2020 at 4:49 AM Timo Walther <[hidden email]>
> wrote:
> > >>
> > >>> Hi Dawid,
> > >>>
> > >>> thanks for this FLIP. This solves a lot of issues with the current
> > >>> design for both the Flink contributors and users. +1 for this.
> > >>>
> > >>> Some minor suggestions from my side:
> > >>> - How about finding something shorter for `InitializationContext`?
> > Maybe
> > >>> just `OpenContext`?
> > >>> - While introducing default methods for existing interfaces, shall we
> > >>> also create contexts for those methods? I see the following method in
> > >>> your FLIP and wonder if we can reduce the number of parameters while
> > >>> introducing a new method:
> > >>>
> > >>> deserialize(
> > >>>              byte[] recordValue,
> > >>>              String partitionKey,
> > >>>              String seqNum,
> > >>>              long approxArrivalTimestamp,
> > >>>              String stream,
> > >>>              String shardId,
> > >>>              Collector<T> out)
> > >>>
> > >>> to:
> > >>>
> > >>> deserialize(
> > >>>              byte[] recordValue,
> > >>>              Context c,
> > >>>              Collector<T> out)
> > >>>
> > >>> What do you think?
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>>
> > >>>
> > >>> On 06.04.20 11:08, Dawid Wysakowicz wrote:
> > >>>> Hi devs,
> > >>>>
> > >>>> When working on improving the Table API/SQL connectors we faced a
> few
> > >>>> shortcomings of the DeserializationSchema and SerializationSchema
> > >>>> interfaces. Similar features were also mentioned by other users in
> the
> > >>>> past. The shortcomings I would like to address with the FLIP
> include:
> > >>>>
> > >>>>   * Emitting 0 to m records from the deserialization schema with per
> > >>>>     partition watermarks
> > >>>>       o
> > >> https://github.com/apache/flink/pull/3314#issuecomment-376237266
> > >>>>       o differentiate null value from no value
> > >>>>       o support for Debezium CDC format
> > >>>>         (
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-105%3A+Support+to+Interpret+and+Emit+Changelog+in+Flink+SQL
> > >>> )
> > >>>>   * A way to initialize the schema
> > >>>>       o establish external connections
> > >>>>       o generate code on startup
> > >>>>       o no need for lazy initialization
> > >>>>
> > >>>>   * Access to metrics
> > >>>>     [
> > >>
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Custom-Metrics-outside-RichFunctions-td32282.html#a32329
> > >>> ]
> > >>>> One important aspect I would like to hear your opinion on is how to
> > >>>> support the Collector interface in Kafka source. Of course if we
> agree
> > >>>> to add the Collector to the DeserializationSchema.
> > >>>>
> > >>>> The FLIP can be found here:
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988&src=contextnavpagetreemode
> > >>>> Looking forward to your feedback.
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Dawid
> > >>>>
> > >>>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Aljoscha Krettek-2
In reply to this post by dwysakowicz
On 07.04.20 08:45, Dawid Wysakowicz wrote:

> @Jark I was aware of the implementation of SinkFunction, but it was a
> conscious choice to not do it that way.
>
> Personally I am against giving a default implementation to both the new
> and old methods. This results in an interface that by default does
> nothing or notifies the user only in the runtime, that he/she has not
> implemented a method of the interface, which does not sound like a good
> practice to me. Moreover I believe the method without a Collector will
> still be the preferred method by many users. Plus it communicates
> explicitly what is the minimal functionality required by the interface.
> Nevertheless I am happy to hear other opinions.

Dawid and I discussed this before. I did the extension of the
SinkFunction but by now I think it's better to do it this way, because
otherwise you can implement the interface without implementing any methods.

> @all I also prefer the buffering approach. Let's wait a day or two more
> to see if others think differently.

I'm also in favour of buffering outside the lock.

Also, +1 to this FLIP.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

SHI Xiaogang
Hi,

I don't think the proposal is a good solution to the problems. I am in
favour of using a ProcessFunction chained to the source/sink function to
serialize/deserialize the records, instead of embedding (de)serialization
schema in source/sink function.

Message packing is heavily used in our production environment to allow
compression and improve throughput. As buffered messages have to be
delivered when the time exceeds the limit, timers are also required in our
cases. I think it's also a common need for other users.

In the this proposal, with more components added into the context, in the
end we will find the serialization/deserialization schema is just another
wrapper of ProcessFunction.

Regards,
Xiaogang

Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:

> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>
> > @Jark I was aware of the implementation of SinkFunction, but it was a
> > conscious choice to not do it that way.
> >
> > Personally I am against giving a default implementation to both the new
> > and old methods. This results in an interface that by default does
> > nothing or notifies the user only in the runtime, that he/she has not
> > implemented a method of the interface, which does not sound like a good
> > practice to me. Moreover I believe the method without a Collector will
> > still be the preferred method by many users. Plus it communicates
> > explicitly what is the minimal functionality required by the interface.
> > Nevertheless I am happy to hear other opinions.
>
> Dawid and I discussed this before. I did the extension of the
> SinkFunction but by now I think it's better to do it this way, because
> otherwise you can implement the interface without implementing any methods.
>
> > @all I also prefer the buffering approach. Let's wait a day or two more
> > to see if others think differently.
>
> I'm also in favour of buffering outside the lock.
>
> Also, +1 to this FLIP.
>
> Best,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Jark Wu-2
Hi Xiaogang,

I think this proposal doesn't conflict with your use case, you can still
chain a ProcessFunction after a source which emits raw data.
But I'm not in favor of chaining ProcessFunction after source, and we
should avoid that, because:

1) For correctness, it is necessary to perform the watermark generation as
early as possible in order to be close to the actual data
 generation within a source's data partition. This is also the purpose of
per-partition watermark and event-time alignment.
 Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
Deseriazing records and generating watermark in chained
 ProcessFunction makes it difficult to do per-partition watermark in the
future.
2) In Flink SQL, a source should emit the deserialized row instead of raw
data. Otherwise, users have to define raw byte[] as the
 single column of the defined table, and parse them in queries, which is
very inconvenient.

Best,
Jark

On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <[hidden email]> wrote:

> Hi,
>
> I don't think the proposal is a good solution to the problems. I am in
> favour of using a ProcessFunction chained to the source/sink function to
> serialize/deserialize the records, instead of embedding (de)serialization
> schema in source/sink function.
>
> Message packing is heavily used in our production environment to allow
> compression and improve throughput. As buffered messages have to be
> delivered when the time exceeds the limit, timers are also required in our
> cases. I think it's also a common need for other users.
>
> In the this proposal, with more components added into the context, in the
> end we will find the serialization/deserialization schema is just another
> wrapper of ProcessFunction.
>
> Regards,
> Xiaogang
>
> Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:
>
> > On 07.04.20 08:45, Dawid Wysakowicz wrote:
> >
> > > @Jark I was aware of the implementation of SinkFunction, but it was a
> > > conscious choice to not do it that way.
> > >
> > > Personally I am against giving a default implementation to both the new
> > > and old methods. This results in an interface that by default does
> > > nothing or notifies the user only in the runtime, that he/she has not
> > > implemented a method of the interface, which does not sound like a good
> > > practice to me. Moreover I believe the method without a Collector will
> > > still be the preferred method by many users. Plus it communicates
> > > explicitly what is the minimal functionality required by the interface.
> > > Nevertheless I am happy to hear other opinions.
> >
> > Dawid and I discussed this before. I did the extension of the
> > SinkFunction but by now I think it's better to do it this way, because
> > otherwise you can implement the interface without implementing any
> methods.
> >
> > > @all I also prefer the buffering approach. Let's wait a day or two more
> > > to see if others think differently.
> >
> > I'm also in favour of buffering outside the lock.
> >
> > Also, +1 to this FLIP.
> >
> > Best,
> > Aljoscha
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Aljoscha Krettek-2
On 10.04.20 17:35, Jark Wu wrote:
> 1) For correctness, it is necessary to perform the watermark generation as
> early as possible in order to be close to the actual data
>   generation within a source's data partition. This is also the purpose of
> per-partition watermark and event-time alignment.
>   Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
> Deseriazing records and generating watermark in chained
>   ProcessFunction makes it difficult to do per-partition watermark in the
> future.

For me, this this the main reason for this, i.e. we need to extract the
records in the source so that we can correctly generate per-partition
watermarks.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

dwysakowicz
In reply to this post by Jark Wu-2
Hi Xiaogang,

I very much agree with Jark's and Aljoscha's responses.


On 10/04/2020 17:35, Jark Wu wrote:

> Hi Xiaogang,
>
> I think this proposal doesn't conflict with your use case, you can still
> chain a ProcessFunction after a source which emits raw data.
> But I'm not in favor of chaining ProcessFunction after source, and we
> should avoid that, because:
>
> 1) For correctness, it is necessary to perform the watermark generation as
> early as possible in order to be close to the actual data
>  generation within a source's data partition. This is also the purpose of
> per-partition watermark and event-time alignment.
>  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
> Deseriazing records and generating watermark in chained
>  ProcessFunction makes it difficult to do per-partition watermark in the
> future.
> 2) In Flink SQL, a source should emit the deserialized row instead of raw
> data. Otherwise, users have to define raw byte[] as the
>  single column of the defined table, and parse them in queries, which is
> very inconvenient.
>
> Best,
> Jark
>
> On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <[hidden email]> wrote:
>
>> Hi,
>>
>> I don't think the proposal is a good solution to the problems. I am in
>> favour of using a ProcessFunction chained to the source/sink function to
>> serialize/deserialize the records, instead of embedding (de)serialization
>> schema in source/sink function.
>>
>> Message packing is heavily used in our production environment to allow
>> compression and improve throughput. As buffered messages have to be
>> delivered when the time exceeds the limit, timers are also required in our
>> cases. I think it's also a common need for other users.
>>
>> In the this proposal, with more components added into the context, in the
>> end we will find the serialization/deserialization schema is just another
>> wrapper of ProcessFunction.
>>
>> Regards,
>> Xiaogang
>>
>> Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:
>>
>>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>>>
>>>> @Jark I was aware of the implementation of SinkFunction, but it was a
>>>> conscious choice to not do it that way.
>>>>
>>>> Personally I am against giving a default implementation to both the new
>>>> and old methods. This results in an interface that by default does
>>>> nothing or notifies the user only in the runtime, that he/she has not
>>>> implemented a method of the interface, which does not sound like a good
>>>> practice to me. Moreover I believe the method without a Collector will
>>>> still be the preferred method by many users. Plus it communicates
>>>> explicitly what is the minimal functionality required by the interface.
>>>> Nevertheless I am happy to hear other opinions.
>>> Dawid and I discussed this before. I did the extension of the
>>> SinkFunction but by now I think it's better to do it this way, because
>>> otherwise you can implement the interface without implementing any
>> methods.
>>>> @all I also prefer the buffering approach. Let's wait a day or two more
>>>> to see if others think differently.
>>> I'm also in favour of buffering outside the lock.
>>>
>>> Also, +1 to this FLIP.
>>>
>>> Best,
>>> Aljoscha
>>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Stephan Ewen
Hi!

Sorry for being a bit late to the party.

One very important thing to consider for "serialization under checkpoint
lock or not" is:
  - If you do it under checkpoint lock, things are automatically correct.
Checkpoint barriers go between original records that correspond to offsets
in the source.
  - If you deserialize outside the checkpoint lock, then you read a record
from the source but only partially emit it. In that case you need to store
the difference (not emitted part) in the checkpoint.

==> I would advise against trying to emit partial records, i.e. doing
things outside the checkpoint lock. FLIP-27 will by default also not do
partial emission of unnested events. Also, it is questionable whether
optimizing this in the source makes sense when no other operator supports
that (flatMap, etc.).

Regarding Seth's comment about performance:
  - For that it does probably makes not so much difference whether this is
under lock or not, but more whether this can be pushed to another thread
(source's I/O thread), so that it does not add load to the main task
processing thread.

==> This means that the I/O thread deserialized that "batch" that it hands
over.
==> Still, it is important that all records coming from one original source
record are emitted atomically, otherwise we have the same issue as above.

Best,
Stephan


On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Xiaogang,
>
> I very much agree with Jark's and Aljoscha's responses.
>
>
> On 10/04/2020 17:35, Jark Wu wrote:
> > Hi Xiaogang,
> >
> > I think this proposal doesn't conflict with your use case, you can still
> > chain a ProcessFunction after a source which emits raw data.
> > But I'm not in favor of chaining ProcessFunction after source, and we
> > should avoid that, because:
> >
> > 1) For correctness, it is necessary to perform the watermark generation
> as
> > early as possible in order to be close to the actual data
> >  generation within a source's data partition. This is also the purpose of
> > per-partition watermark and event-time alignment.
> >  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
> > Deseriazing records and generating watermark in chained
> >  ProcessFunction makes it difficult to do per-partition watermark in the
> > future.
> > 2) In Flink SQL, a source should emit the deserialized row instead of raw
> > data. Otherwise, users have to define raw byte[] as the
> >  single column of the defined table, and parse them in queries, which is
> > very inconvenient.
> >
> > Best,
> > Jark
> >
> > On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <[hidden email]>
> wrote:
> >
> >> Hi,
> >>
> >> I don't think the proposal is a good solution to the problems. I am in
> >> favour of using a ProcessFunction chained to the source/sink function to
> >> serialize/deserialize the records, instead of embedding
> (de)serialization
> >> schema in source/sink function.
> >>
> >> Message packing is heavily used in our production environment to allow
> >> compression and improve throughput. As buffered messages have to be
> >> delivered when the time exceeds the limit, timers are also required in
> our
> >> cases. I think it's also a common need for other users.
> >>
> >> In the this proposal, with more components added into the context, in
> the
> >> end we will find the serialization/deserialization schema is just
> another
> >> wrapper of ProcessFunction.
> >>
> >> Regards,
> >> Xiaogang
> >>
> >> Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:
> >>
> >>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
> >>>
> >>>> @Jark I was aware of the implementation of SinkFunction, but it was a
> >>>> conscious choice to not do it that way.
> >>>>
> >>>> Personally I am against giving a default implementation to both the
> new
> >>>> and old methods. This results in an interface that by default does
> >>>> nothing or notifies the user only in the runtime, that he/she has not
> >>>> implemented a method of the interface, which does not sound like a
> good
> >>>> practice to me. Moreover I believe the method without a Collector will
> >>>> still be the preferred method by many users. Plus it communicates
> >>>> explicitly what is the minimal functionality required by the
> interface.
> >>>> Nevertheless I am happy to hear other opinions.
> >>> Dawid and I discussed this before. I did the extension of the
> >>> SinkFunction but by now I think it's better to do it this way, because
> >>> otherwise you can implement the interface without implementing any
> >> methods.
> >>>> @all I also prefer the buffering approach. Let's wait a day or two
> more
> >>>> to see if others think differently.
> >>> I'm also in favour of buffering outside the lock.
> >>>
> >>> Also, +1 to this FLIP.
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

dwysakowicz

Hi Stephan,

I fully agree with what you said. Also as far as I can tell what was suggested in the FLIP-124 does not contradict with what you are saying. Let me clarify it a bit if it is not clear in the document.

Current implementations of Kafka and Kinesis do the deserialization outside of the checkpoint lock in threads separate from the main processing thread already. The approach described as option 1, which had the most supporters is to keep that behavior. The way I would like to support emitting multiple results in this setup is to let the DeserializationSchema deserialize records into a list (via collector) that will be emitted atomically all at once.

Currently the behavior can be modelled as:

T record = deserializationSchema.deserialize(...)
synchronized(checkpointLock) {
   sourceContext.collect(record)
   updateSourceState(...)
}

and I was suggesting to change it to:

Collector out = new Collector();
deserializationSchema.deserialize(..., out);
List<T> deserializedRecords = out.getRecords();
synchronized(checkpointLock) {
   for (T record: deserializedRecords) {
        sourceContext.collect(record)
   }
   updateSourceState(...)

}

I think that is aligned with your comment to Seth's comment that the "batch" of records originating from a source record is atomically emitted.

Best,

Dawid



On 23/04/2020 14:55, Stephan Ewen wrote:
Hi!

Sorry for being a bit late to the party.

One very important thing to consider for "serialization under checkpoint lock or not" is:
  - If you do it under checkpoint lock, things are automatically correct. Checkpoint barriers go between original records that correspond to offsets in the source.
  - If you deserialize outside the checkpoint lock, then you read a record from the source but only partially emit it. In that case you need to store the difference (not emitted part) in the checkpoint.

==> I would advise against trying to emit partial records, i.e. doing things outside the checkpoint lock. FLIP-27 will by default also not do partial emission of unnested events. Also, it is questionable whether optimizing this in the source makes sense when no other operator supports that (flatMap, etc.).

Regarding Seth's comment about performance:
  - For that it does probably makes not so much difference whether this is under lock or not, but more whether this can be pushed to another thread (source's I/O thread), so that it does not add load to the main task processing thread.

==> This means that the I/O thread deserialized that "batch" that it hands over.
==> Still, it is important that all records coming from one original source record are emitted atomically, otherwise we have the same issue as above.

Best,
Stephan


On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi Xiaogang,

I very much agree with Jark's and Aljoscha's responses.


On 10/04/2020 17:35, Jark Wu wrote:
> Hi Xiaogang,
>
> I think this proposal doesn't conflict with your use case, you can still
> chain a ProcessFunction after a source which emits raw data.
> But I'm not in favor of chaining ProcessFunction after source, and we
> should avoid that, because:
>
> 1) For correctness, it is necessary to perform the watermark generation as
> early as possible in order to be close to the actual data
>  generation within a source's data partition. This is also the purpose of
> per-partition watermark and event-time alignment.
>  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
> Deseriazing records and generating watermark in chained
>  ProcessFunction makes it difficult to do per-partition watermark in the
> future.
> 2) In Flink SQL, a source should emit the deserialized row instead of raw
> data. Otherwise, users have to define raw byte[] as the
>  single column of the defined table, and parse them in queries, which is
> very inconvenient.
>
> Best,
> Jark
>
> On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <[hidden email]> wrote:
>
>> Hi,
>>
>> I don't think the proposal is a good solution to the problems. I am in
>> favour of using a ProcessFunction chained to the source/sink function to
>> serialize/deserialize the records, instead of embedding (de)serialization
>> schema in source/sink function.
>>
>> Message packing is heavily used in our production environment to allow
>> compression and improve throughput. As buffered messages have to be
>> delivered when the time exceeds the limit, timers are also required in our
>> cases. I think it's also a common need for other users.
>>
>> In the this proposal, with more components added into the context, in the
>> end we will find the serialization/deserialization schema is just another
>> wrapper of ProcessFunction.
>>
>> Regards,
>> Xiaogang
>>
>> Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:
>>
>>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>>>
>>>> @Jark I was aware of the implementation of SinkFunction, but it was a
>>>> conscious choice to not do it that way.
>>>>
>>>> Personally I am against giving a default implementation to both the new
>>>> and old methods. This results in an interface that by default does
>>>> nothing or notifies the user only in the runtime, that he/she has not
>>>> implemented a method of the interface, which does not sound like a good
>>>> practice to me. Moreover I believe the method without a Collector will
>>>> still be the preferred method by many users. Plus it communicates
>>>> explicitly what is the minimal functionality required by the interface.
>>>> Nevertheless I am happy to hear other opinions.
>>> Dawid and I discussed this before. I did the extension of the
>>> SinkFunction but by now I think it's better to do it this way, because
>>> otherwise you can implement the interface without implementing any
>> methods.
>>>> @all I also prefer the buffering approach. Let's wait a day or two more
>>>> to see if others think differently.
>>> I'm also in favour of buffering outside the lock.
>>>
>>> Also, +1 to this FLIP.
>>>
>>> Best,
>>> Aljoscha
>>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-124: Add open/close and Collector to (De)SerializationSchema

Stephan Ewen
That makes sense, thanks for clarifying.

Best,
Stephan


On Fri, Apr 24, 2020 at 2:15 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Stephan,
>
> I fully agree with what you said. Also as far as I can tell what was
> suggested in the FLIP-124 does not contradict with what you are saying. Let
> me clarify it a bit if it is not clear in the document.
>
> Current implementations of Kafka and Kinesis do the deserialization
> outside of the checkpoint lock in threads separate from the main processing
> thread already. The approach described as option 1, which had the most
> supporters is to keep that behavior. The way I would like to support
> emitting multiple results in this setup is to let the DeserializationSchema
> deserialize records into a list (via collector) that will be emitted
> atomically all at once.
>
> Currently the behavior can be modelled as:
> T record = deserializationSchema.deserialize(...)
> synchronized(checkpointLock) {
>    sourceContext.collect(record)
>    updateSourceState(...)
> }
>
> and I was suggesting to change it to:
> Collector out = new Collector();
> deserializationSchema.deserialize(..., out);
> List<T> deserializedRecords = out.getRecords();
> synchronized(checkpointLock) {
>    for (T record: deserializedRecords) {
>         sourceContext.collect(record)
>    }
>    updateSourceState(...)
>
> }
>
> I think that is aligned with your comment to Seth's comment that the
> "batch" of records originating from a source record is atomically emitted.
>
> Best,
>
> Dawid
>
>
>
> On 23/04/2020 14:55, Stephan Ewen wrote:
>
> Hi!
>
> Sorry for being a bit late to the party.
>
> One very important thing to consider for "serialization under checkpoint
> lock or not" is:
>   - If you do it under checkpoint lock, things are automatically correct.
> Checkpoint barriers go between original records that correspond to offsets
> in the source.
>   - If you deserialize outside the checkpoint lock, then you read a record
> from the source but only partially emit it. In that case you need to store
> the difference (not emitted part) in the checkpoint.
>
> ==> I would advise against trying to emit partial records, i.e. doing
> things outside the checkpoint lock. FLIP-27 will by default also not do
> partial emission of unnested events. Also, it is questionable whether
> optimizing this in the source makes sense when no other operator supports
> that (flatMap, etc.).
>
> Regarding Seth's comment about performance:
>   - For that it does probably makes not so much difference whether this is
> under lock or not, but more whether this can be pushed to another thread
> (source's I/O thread), so that it does not add load to the main task
> processing thread.
>
> ==> This means that the I/O thread deserialized that "batch" that it hands
> over.
> ==> Still, it is important that all records coming from one original
> source record are emitted atomically, otherwise we have the same issue as
> above.
>
> Best,
> Stephan
>
>
> On Tue, Apr 14, 2020 at 10:35 AM Dawid Wysakowicz <[hidden email]>
> wrote:
>
>> Hi Xiaogang,
>>
>> I very much agree with Jark's and Aljoscha's responses.
>>
>>
>> On 10/04/2020 17:35, Jark Wu wrote:
>> > Hi Xiaogang,
>> >
>> > I think this proposal doesn't conflict with your use case, you can still
>> > chain a ProcessFunction after a source which emits raw data.
>> > But I'm not in favor of chaining ProcessFunction after source, and we
>> > should avoid that, because:
>> >
>> > 1) For correctness, it is necessary to perform the watermark generation
>> as
>> > early as possible in order to be close to the actual data
>> >  generation within a source's data partition. This is also the purpose
>> of
>> > per-partition watermark and event-time alignment.
>> >  Many on going FLIPs (e.g. FLIP-27, FLIP-95) works a lot on this effort.
>> > Deseriazing records and generating watermark in chained
>> >  ProcessFunction makes it difficult to do per-partition watermark in the
>> > future.
>> > 2) In Flink SQL, a source should emit the deserialized row instead of
>> raw
>> > data. Otherwise, users have to define raw byte[] as the
>> >  single column of the defined table, and parse them in queries, which is
>> > very inconvenient.
>> >
>> > Best,
>> > Jark
>> >
>> > On Fri, 10 Apr 2020 at 09:18, SHI Xiaogang <[hidden email]>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> I don't think the proposal is a good solution to the problems. I am in
>> >> favour of using a ProcessFunction chained to the source/sink function
>> to
>> >> serialize/deserialize the records, instead of embedding
>> (de)serialization
>> >> schema in source/sink function.
>> >>
>> >> Message packing is heavily used in our production environment to allow
>> >> compression and improve throughput. As buffered messages have to be
>> >> delivered when the time exceeds the limit, timers are also required in
>> our
>> >> cases. I think it's also a common need for other users.
>> >>
>> >> In the this proposal, with more components added into the context, in
>> the
>> >> end we will find the serialization/deserialization schema is just
>> another
>> >> wrapper of ProcessFunction.
>> >>
>> >> Regards,
>> >> Xiaogang
>> >>
>> >> Aljoscha Krettek <[hidden email]> 于2020年4月7日周二 下午6:34写道:
>> >>
>> >>> On 07.04.20 08:45, Dawid Wysakowicz wrote:
>> >>>
>> >>>> @Jark I was aware of the implementation of SinkFunction, but it was a
>> >>>> conscious choice to not do it that way.
>> >>>>
>> >>>> Personally I am against giving a default implementation to both the
>> new
>> >>>> and old methods. This results in an interface that by default does
>> >>>> nothing or notifies the user only in the runtime, that he/she has not
>> >>>> implemented a method of the interface, which does not sound like a
>> good
>> >>>> practice to me. Moreover I believe the method without a Collector
>> will
>> >>>> still be the preferred method by many users. Plus it communicates
>> >>>> explicitly what is the minimal functionality required by the
>> interface.
>> >>>> Nevertheless I am happy to hear other opinions.
>> >>> Dawid and I discussed this before. I did the extension of the
>> >>> SinkFunction but by now I think it's better to do it this way, because
>> >>> otherwise you can implement the interface without implementing any
>> >> methods.
>> >>>> @all I also prefer the buffering approach. Let's wait a day or two
>> more
>> >>>> to see if others think differently.
>> >>> I'm also in favour of buffering outside the lock.
>> >>>
>> >>> Also, +1 to this FLIP.
>> >>>
>> >>> Best,
>> >>> Aljoscha
>> >>>
>>
>>