What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Tzu-Li Tai
Hi devs,

I need a little help on clarification of what the arguments "topic" and "offset" is used for in KeyedDeserializationSchema.deserialize(). The main issue is that I'm currently in progress of implementing Flink Kinesis Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental starting from 0, are digits that can only by stored in BigIntegers and generally doesn't increment by 1 between each data record.

Just need to make sure that I won't be messing things up with these two values. A point to any part of the codebase where I can understand how Flink uses "topic" and "offset" in the deserialization schema would be perfect.

Many thanks in advance!

Cheers,
Gordon
mxm
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

mxm
Hi Gordon,

You may use "topic" and "offset" for whatever you like. Note that this
is just an interface. If it does not work for your Kinesis adapter,
you may create a new interface. For existing usage of the
KeyedDeserializationSchema, please have a look at the
FlinkKafkaConsumer.

Cheers,
Max

On Tue, Jan 19, 2016 at 11:27 AM, Tzu-Li (Gordon) Tai
<[hidden email]> wrote:

> Hi devs,
>
> I need a little help on clarification of what the arguments "topic" and
> "offset" is used for in KeyedDeserializationSchema.deserialize(). The main
> issue is that I'm currently in progress of implementing Flink Kinesis
> Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental
> starting from 0, are digits that can only by stored in BigIntegers and
> generally doesn't increment by 1 between each data record.
>
> Just need to make sure that I won't be messing things up with these two
> values. A point to any part of the codebase where I can understand how Flink
> uses "topic" and "offset" in the deserialization schema would be perfect.
>
> Many thanks in advance!
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Tzu-Li Tai
Hi Max,

Thanks for the quick response and clarification :)
I got a bit confused and thought that Flink internals would be accessing this interface too.

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Robert Metzger
Hi Gordon,

thank you for starting the discussion. I think in fact the
KeyedDeserializationSchema is located in the wrong package. Its methods are
very Kafka specific, maybe I should move them there.

How would the deserializationSchema for Kinesis look like? Does the Kinesis
API return byte[] ? Is there any other information which is useful for
users?


On Tue, Jan 19, 2016 at 11:49 AM, Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Max,
>
> Thanks for the quick response and clarification :)
> I got a bit confused and thought that Flink internals would be accessing
> this interface too.
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9914.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Tzu-Li Tai
Hi Robert,

+1 for a change to where the KeyedDeserializationSchema is located. I was just starting to wonder how I should name the Kinesis's deserializationSchema if I were to create another one in the same package.

For Kinesis, the API returns String for key, byte[] for value, String for streamName (similar to Kafka topic), and a String for the offset. So I would definitely need to create a new deserializationSchema for this.

For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the required new interfaces in Kinesis connector specific packages. I'd be happy to help with relocating the current KeyedDeserializationSchema related interfaces and classes to Kafka specific package as a seperate issue, if you want to =)

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Robert Metzger
I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0
support (its a pending pull request I'll merge soon)

On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Robert,
>
> +1 for a change to where the KeyedDeserializationSchema is located. I was
> just starting to wonder how I should name the Kinesis's
> deserializationSchema if I were to create another one in the same package.
>
> For Kinesis, the API returns String for key, byte[] for value, String for
> streamName (similar to Kafka topic), and a String for the offset. So I
> would
> definitely need to create a new deserializationSchema for this.
>
> For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the
> required new interfaces in Kinesis connector specific packages. I'd be
> happy
> to help with relocating the current KeyedDeserializationSchema related
> interfaces and classes to Kafka specific package as a seperate issue, if
> you
> want to =)
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Stephan Ewen
While the de-serializations schema is not used by the Flink internals, I
think the initial idea  was to use it across different sources/sinks (like
Kafka, Socket, RabbitMQ, ...)

Does it make sense to have a KafkaDeSerializationSchema, and then wrap the
common serialization schemata?

On Tue, Jan 19, 2016 at 12:25 PM, Robert Metzger <[hidden email]>
wrote:

> I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0
> support (its a pending pull request I'll merge soon)
>
> On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <[hidden email]>
> wrote:
>
> > Hi Robert,
> >
> > +1 for a change to where the KeyedDeserializationSchema is located. I was
> > just starting to wonder how I should name the Kinesis's
> > deserializationSchema if I were to create another one in the same
> package.
> >
> > For Kinesis, the API returns String for key, byte[] for value, String for
> > streamName (similar to Kafka topic), and a String for the offset. So I
> > would
> > definitely need to create a new deserializationSchema for this.
> >
> > For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the
> > required new interfaces in Kinesis connector specific packages. I'd be
> > happy
> > to help with relocating the current KeyedDeserializationSchema related
> > interfaces and classes to Kafka specific package as a seperate issue, if
> > you
> > want to =)
> >
> > Cheers,
> > Gordon
> >
> >
> >
> > --
> > View this message in context:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html
> > Sent from the Apache Flink Mailing List archive. mailing list archive at
> > Nabble.com.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Tzu-Li Tai
Hi Stephan,

A comment on this. For KeyedDeserializationSchema, I don't think it is necessary.
As previously explained, the interfaces for the KeyedDeserializationSchema of Kafka / Kinesis can be quite different, and may also be specific for future external systems that we might implement connectors to. Wrapper classes for a common KeyedDeserializationSchema doesn't seem to make sense, since in the end we will still need to expose system-specific interfaces for the user.

It may be reasonable to keep the most simple DeSerializationSchema interfaces and wrappers in flink-streaming-java. By a simple KeyDeserializationSchema, I mean deserialize() methods that only take key as byte[] and message as byte[]. If new connectors happen to require more specific interfaces, then they create them in their own module (flink-connector-*).

Cheers,
Gordon
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Robert Metzger
Hi Gordon,

I'll move the KeyedDeserializationSchema to the Kafka module.

On Wed, Jan 20, 2016 at 8:24 AM, Tzu-Li (Gordon) Tai <[hidden email]>
wrote:

> Hi Stephan,
>
> A comment on this. For KeyedDeserializationSchema, I don't think it is
> necessary.
> As previously explained, the interfaces for the KeyedDeserializationSchema
> of Kafka / Kinesis can be quite different, and may also be specific for
> future external systems that we might implement connectors to. Wrapper
> classes for a common KeyedDeserializationSchema doesn't seem to make sense,
> since in the end we will still need to expose system-specific interfaces
> for
> the user.
>
> It may be reasonable to keep the most simple DeSerializationSchema
> interfaces and wrappers in flink-streaming-java. By a simple
> KeyDeserializationSchema, I mean deserialize() methods that only take key
> as
> byte[] and message as byte[]. If new connectors happen to require more
> specific interfaces, then they create them in their own module
> (flink-connector-*).
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9944.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: What is the topic & offset used for in KeyedDeserializationSchema.deserialize()?

Tzu-Li Tai
Hi Robert,

No problem, thanks for the notice!