Hi devs,
I need a little help on clarification of what the arguments "topic" and "offset" is used for in KeyedDeserializationSchema.deserialize(). The main issue is that I'm currently in progress of implementing Flink Kinesis Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental starting from 0, are digits that can only by stored in BigIntegers and generally doesn't increment by 1 between each data record. Just need to make sure that I won't be messing things up with these two values. A point to any part of the codebase where I can understand how Flink uses "topic" and "offset" in the deserialization schema would be perfect. Many thanks in advance! Cheers, Gordon |
Hi Gordon,
You may use "topic" and "offset" for whatever you like. Note that this is just an interface. If it does not work for your Kinesis adapter, you may create a new interface. For existing usage of the KeyedDeserializationSchema, please have a look at the FlinkKafkaConsumer. Cheers, Max On Tue, Jan 19, 2016 at 11:27 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi devs, > > I need a little help on clarification of what the arguments "topic" and > "offset" is used for in KeyedDeserializationSchema.deserialize(). The main > issue is that I'm currently in progress of implementing Flink Kinesis > Consumer, and Kinesis offsets, unlike Kafka offsets which are incremental > starting from 0, are digits that can only by stored in BigIntegers and > generally doesn't increment by 1 between each data record. > > Just need to make sure that I won't be messing things up with these two > values. A point to any part of the codebase where I can understand how Flink > uses "topic" and "offset" in the deserialization schema would be perfect. > > Many thanks in advance! > > Cheers, > Gordon > > > > -- > View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911.html > Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com. |
Hi Max,
Thanks for the quick response and clarification :) I got a bit confused and thought that Flink internals would be accessing this interface too. Cheers, Gordon |
Hi Gordon,
thank you for starting the discussion. I think in fact the KeyedDeserializationSchema is located in the wrong package. Its methods are very Kafka specific, maybe I should move them there. How would the deserializationSchema for Kinesis look like? Does the Kinesis API return byte[] ? Is there any other information which is useful for users? On Tue, Jan 19, 2016 at 11:49 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi Max, > > Thanks for the quick response and clarification :) > I got a bit confused and thought that Flink internals would be accessing > this interface too. > > Cheers, > Gordon > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9914.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi Robert,
+1 for a change to where the KeyedDeserializationSchema is located. I was just starting to wonder how I should name the Kinesis's deserializationSchema if I were to create another one in the same package. For Kinesis, the API returns String for key, byte[] for value, String for streamName (similar to Kafka topic), and a String for the offset. So I would definitely need to create a new deserializationSchema for this. For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the required new interfaces in Kinesis connector specific packages. I'd be happy to help with relocating the current KeyedDeserializationSchema related interfaces and classes to Kafka specific package as a seperate issue, if you want to =) Cheers, Gordon |
I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0
support (its a pending pull request I'll merge soon) On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi Robert, > > +1 for a change to where the KeyedDeserializationSchema is located. I was > just starting to wonder how I should name the Kinesis's > deserializationSchema if I were to create another one in the same package. > > For Kinesis, the API returns String for key, byte[] for value, String for > streamName (similar to Kafka topic), and a String for the offset. So I > would > definitely need to create a new deserializationSchema for this. > > For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the > required new interfaces in Kinesis connector specific packages. I'd be > happy > to help with relocating the current KeyedDeserializationSchema related > interfaces and classes to Kafka specific package as a seperate issue, if > you > want to =) > > Cheers, > Gordon > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
While the de-serializations schema is not used by the Flink internals, I
think the initial idea was to use it across different sources/sinks (like Kafka, Socket, RabbitMQ, ...) Does it make sense to have a KafkaDeSerializationSchema, and then wrap the common serialization schemata? On Tue, Jan 19, 2016 at 12:25 PM, Robert Metzger <[hidden email]> wrote: > I'll relocate the KeyedDeserializationSchema as part of the Kafka 0.9.0.0 > support (its a pending pull request I'll merge soon) > > On Tue, Jan 19, 2016 at 12:20 PM, Tzu-Li (Gordon) Tai <[hidden email]> > wrote: > > > Hi Robert, > > > > +1 for a change to where the KeyedDeserializationSchema is located. I was > > just starting to wonder how I should name the Kinesis's > > deserializationSchema if I were to create another one in the same > package. > > > > For Kinesis, the API returns String for key, byte[] for value, String for > > streamName (similar to Kafka topic), and a String for the offset. So I > > would > > definitely need to create a new deserializationSchema for this. > > > > For https://issues.apache.org/jira/browse/FLINK-3229, I'll create the > > required new interfaces in Kinesis connector specific packages. I'd be > > happy > > to help with relocating the current KeyedDeserializationSchema related > > interfaces and classes to Kafka specific package as a seperate issue, if > > you > > want to =) > > > > Cheers, > > Gordon > > > > > > > > -- > > View this message in context: > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9917.html > > Sent from the Apache Flink Mailing List archive. mailing list archive at > > Nabble.com. > > > |
Hi Stephan,
A comment on this. For KeyedDeserializationSchema, I don't think it is necessary. As previously explained, the interfaces for the KeyedDeserializationSchema of Kafka / Kinesis can be quite different, and may also be specific for future external systems that we might implement connectors to. Wrapper classes for a common KeyedDeserializationSchema doesn't seem to make sense, since in the end we will still need to expose system-specific interfaces for the user. It may be reasonable to keep the most simple DeSerializationSchema interfaces and wrappers in flink-streaming-java. By a simple KeyDeserializationSchema, I mean deserialize() methods that only take key as byte[] and message as byte[]. If new connectors happen to require more specific interfaces, then they create them in their own module (flink-connector-*). Cheers, Gordon |
Hi Gordon,
I'll move the KeyedDeserializationSchema to the Kafka module. On Wed, Jan 20, 2016 at 8:24 AM, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > Hi Stephan, > > A comment on this. For KeyedDeserializationSchema, I don't think it is > necessary. > As previously explained, the interfaces for the KeyedDeserializationSchema > of Kafka / Kinesis can be quite different, and may also be specific for > future external systems that we might implement connectors to. Wrapper > classes for a common KeyedDeserializationSchema doesn't seem to make sense, > since in the end we will still need to expose system-specific interfaces > for > the user. > > It may be reasonable to keep the most simple DeSerializationSchema > interfaces and wrappers in flink-streaming-java. By a simple > KeyDeserializationSchema, I mean deserialize() methods that only take key > as > byte[] and message as byte[]. If new connectors happen to require more > specific interfaces, then they create them in their own module > (flink-connector-*). > > Cheers, > Gordon > > > > -- > View this message in context: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/What-is-the-topic-offset-used-for-in-KeyedDeserializationSchema-deserialize-tp9911p9944.html > Sent from the Apache Flink Mailing List archive. mailing list archive at > Nabble.com. > |
Hi Robert,
No problem, thanks for the notice! |
Free forum by Nabble | Edit this page |