Serialization Issue

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Serialization Issue

Alexx
This post was updated on .
My stream is producing records of type Tuple2<String,String>
.toString() output (usr12345,{"_key":"usr12345","_temperature":46.6})
where the key is usr12345 and value is {"_key":"usr12345","_temperature":46.6}
The .print() on the stream outputs the value correctly: (usr12345,{"_key":"usr12345","_temperature":46.6})
But when I write the stream to Kafka the key becomes " usr12345" and the value "({"_key":"usr12345","_temperature":46.6}"
Notice the space at the beginning of the key and the left parenthesis at the beginning of the value.
Very strange. Why this might happen?

Here is the serialization code:

TypeInformation<String> resultType = TypeInformation.of(String.class);

KeyedSerializationSchema<Tuple2<String, String>> schema =
      new TypeInformationKeyValueSerializationSchema<>(resultType, resultType, env.getConfig());

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
      stream,  
      "topic",    
      schema,  
      kafkaProducerProperties);
Reply | Threaded
Open this post in threaded view
|

Re: Serialization Issue

Aljoscha Krettek-2
Hi,

TypeInformationKeyValueSerializationSchema uses Flink TypeSerializers to serialize those Strings. What you're seeing is the binary representation of those Strings which, admittedly, resembles the original Strings. If you want to have the data in Kafka with real Strings, I think you need to have a custom KeyedSerializationSchema for your type.

Best,
Aljoscha

> On 27. Aug 2017, at 15:59, Alexx <[hidden email]> wrote:
>
> My stream is producing records of type *Tuple2<String,String>*
> *.toString()* output *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> where the key is *usr12345* and value is
> *{"_key":"usr12345","_temperature":46.6}*
> The *.print()* on the stream outputs the value correctly:
> *(usr12345,{"_key":"usr12345","_temperature":46.6})*
> But when I write the stream to Kafka the key becomes *" usr12345"* and the
> value *"({"_key":"usr12345","_temperature":46.6}"*
> Notice the space at the beginning of the key and the left parenthesis at the
> beginning of the value.
> Very strange. Why this might happen?
>
> Here is the serialization code:
>
> *TypeInformation<String> resultType = TypeInformation.of(String.class);
>
> KeyedSerializationSchema<Tuple2&lt;String, String>> schema =
>      new TypeInformationKeyValueSerializationSchema<>(resultType,
> resultType, env.getConfig());
>
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
> flinkKafkaProducerConfig = FlinkKafkaProducer010.writeToKafkaWithTimestamps(
>      stream,  
>      "topic",    
>      schema,  
>      kafkaProducerProperties);*
>
>
>
> --
> View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Serialization-Issue-tp19400p19401.html
> Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Serialization Issue

Alexx
Thanks!

I tested the KeyedSerializationSchema and it indeed works.
I tried to use TypeInformationKeyValueSerializationSchema just to make things a bit more automated :)