Failed to register Protobuf Kryo serialization

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed to register Protobuf Kryo serialization

Svend Vanderveken
Hi all,

I'm failing to setup an example of wire serialization with Protobuf, could
you help me figure out what I'm doing wrong?

I'm using a simple protobuf schema:

```

syntax = "proto3";

import "google/protobuf/wrappers.proto";
option java_multiple_files = true;

message DemoUserEvent {
  Metadata metadata = 1;
  oneof payload {
    Created created = 10;
    Updated updated = 11;
  }

  message Created {...}

  message Updated {...}

  ...

}

```


From which I'm generating java from this Gradle plugin:


```

plugins {
    id "com.google.protobuf" version "0.8.15"
}

```


And I'm generating DemoUserEvent instances with Java Iterator looking like this:


```

public class UserEventGenerator implements Iterator<DemoUserEvent>,
Serializable {
    transient public final static Faker faker = new Faker();
    ...
    @Override public DemoUserEvent next() {
        return randomCreatedEvent();

     }

     ...

```


I read those two pieces of documentation:
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
*
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html

And tried the demo app below:

```

import com.twitter.chill.protobuf.ProtobufSerializer;

...

public static void main(String[] args) {
    final StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
    flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);
    flinkEnv.fromCollection(new UserEventGenerator(),
DemoUserEvent.class).print();
}

```

But the serialization mechanism still fails to handle my protobuf class:

11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a getter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - class live.schema.event.user.v1.DemoUserEvent does not
contain a setter for field payloadCase_
11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor
         [] - Class class live.schema.event.user.v1.DemoUserEvent
cannot be used as a POJO type because not all fields are valid POJO
fields, and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the
effect on performance.

I've also tried this, without success:

```

flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class,
ProtobufSerializer.class);

```


I'm using those versions:

```

ext {
    javaVersion = '11'
    flinkVersion = '1.12.1'
    scalaBinaryVersion = '2.12'
}

dependencies {
    compileOnly
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
    implementation ("com.twitter:chill-protobuf:0.9.5") {
        exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
    }
    implementation "com.google.protobuf:protobuf-java:3.14.0"
    implementation 'com.github.javafaker:javafaker:1.0.2'
}

```


Any idea what I should try next?

Thanks in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Failed to register Protobuf Kryo serialization

Svend Vanderveken
Sorry, I realize I posted this to the wrong list, please ignore and I'll
post it to the flink-user one.

On Sun, Feb 14, 2021 at 11:41 AM Svend Vanderveken <[hidden email]>
wrote:

> Hi all,
>
> I'm failing to setup an example of wire serialization with Protobuf, could
> you help me figure out what I'm doing wrong?
>
> I'm using a simple protobuf schema:
>
> ```
>
> syntax = "proto3";
>
> import "google/protobuf/wrappers.proto";
> option java_multiple_files = true;
>
> message DemoUserEvent {
>   Metadata metadata = 1;
>   oneof payload {
>     Created created = 10;
>     Updated updated = 11;
>   }
>
>   message Created {...}
>
>   message Updated {...}
>
>   ...
>
> }
>
> ```
>
>
> From which I'm generating java from this Gradle plugin:
>
>
> ```
>
> plugins {
>     id "com.google.protobuf" version "0.8.15"
> }
>
> ```
>
>
> And I'm generating DemoUserEvent instances with Java Iterator looking like this:
>
>
> ```
>
> public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable {
>     transient public final static Faker faker = new Faker();
>     ...
>     @Override public DemoUserEvent next() {
>         return randomCreatedEvent();
>
>      }
>
>      ...
>
> ```
>
>
> I read those two pieces of documentation:
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> And tried the demo app below:
>
> ```
>
> import com.twitter.chill.protobuf.ProtobufSerializer;
>
> ...
>
> public static void main(String[] args) {
>     final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>     flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
>     flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print();
> }
>
> ```
>
> But the serialization mechanism still fails to handle my protobuf class:
>
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
>
> I've also tried this, without success:
>
> ```
>
> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class);
>
> ```
>
>
> I'm using those versions:
>
> ```
>
> ext {
>     javaVersion = '11'
>     flinkVersion = '1.12.1'
>     scalaBinaryVersion = '2.12'
> }
>
> dependencies {
>     compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>     implementation ("com.twitter:chill-protobuf:0.9.5") {
>         exclude group: 'com.esotericsoftware.kryo', module: 'kryo'
>     }
>     implementation "com.google.protobuf:protobuf-java:3.14.0"
>     implementation 'com.github.javafaker:javafaker:1.0.2'
> }
>
> ```
>
>
> Any idea what I should try next?
>
> Thanks in advance!
>
>
>
>

--
Svend Vanderveken
Kelesia SPRL - BE 0839 049 010
blog: https://svend.kelesia.com <http://svend.kelesia.com/>
Twitter: @sv3ndk <https://twitter.com/sv3ndk>