Hi all,
I'm failing to setup an example of wire serialization with Protobuf, could you help me figure out what I'm doing wrong? I'm using a simple protobuf schema: ``` syntax = "proto3"; import "google/protobuf/wrappers.proto"; option java_multiple_files = true; message DemoUserEvent { Metadata metadata = 1; oneof payload { Created created = 10; Updated updated = 11; } message Created {...} message Updated {...} ... } ``` From which I'm generating java from this Gradle plugin: ``` plugins { id "com.google.protobuf" version "0.8.15" } ``` And I'm generating DemoUserEvent instances with Java Iterator looking like this: ``` public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable { transient public final static Faker faker = new Faker(); ... @Override public DemoUserEvent next() { return randomCreatedEvent(); } ... ``` I read those two pieces of documentation: * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html * https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html And tried the demo app below: ``` import com.twitter.chill.protobuf.ProtobufSerializer; ... public static void main(String[] args) { final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print(); } ``` But the serialization mechanism still fails to handle my protobuf class: 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_ 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. I've also tried this, without success: ``` flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); ``` I'm using those versions: ``` ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' } dependencies { compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" implementation ("com.twitter:chill-protobuf:0.9.5") { exclude group: 'com.esotericsoftware.kryo', module: 'kryo' } implementation "com.google.protobuf:protobuf-java:3.14.0" implementation 'com.github.javafaker:javafaker:1.0.2' } ``` Any idea what I should try next? Thanks in advance! |
Sorry, I realize I posted this to the wrong list, please ignore and I'll
post it to the flink-user one. On Sun, Feb 14, 2021 at 11:41 AM Svend Vanderveken <[hidden email]> wrote: > Hi all, > > I'm failing to setup an example of wire serialization with Protobuf, could > you help me figure out what I'm doing wrong? > > I'm using a simple protobuf schema: > > ``` > > syntax = "proto3"; > > import "google/protobuf/wrappers.proto"; > option java_multiple_files = true; > > message DemoUserEvent { > Metadata metadata = 1; > oneof payload { > Created created = 10; > Updated updated = 11; > } > > message Created {...} > > message Updated {...} > > ... > > } > > ``` > > > From which I'm generating java from this Gradle plugin: > > > ``` > > plugins { > id "com.google.protobuf" version "0.8.15" > } > > ``` > > > And I'm generating DemoUserEvent instances with Java Iterator looking like this: > > > ``` > > public class UserEventGenerator implements Iterator<DemoUserEvent>, Serializable { > transient public final static Faker faker = new Faker(); > ... > @Override public DemoUserEvent next() { > return randomCreatedEvent(); > > } > > ... > > ``` > > > I read those two pieces of documentation: > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html > * > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html > > And tried the demo app below: > > ``` > > import com.twitter.chill.protobuf.ProtobufSerializer; > > ... > > public static void main(String[] args) { > final StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment(); > flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); > flinkEnv.fromCollection(new UserEventGenerator(), DemoUserEvent.class).print(); > } > > ``` > > But the serialization mechanism still fails to handle my protobuf class: > > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a getter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class live.schema.event.user.v1.DemoUserEvent does not contain a setter for field payloadCase_ > 11:22:45,822 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. > > I've also tried this, without success: > > ``` > > flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, ProtobufSerializer.class); > > ``` > > > I'm using those versions: > > ``` > > ext { > javaVersion = '11' > flinkVersion = '1.12.1' > scalaBinaryVersion = '2.12' > } > > dependencies { > compileOnly "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}" > implementation ("com.twitter:chill-protobuf:0.9.5") { > exclude group: 'com.esotericsoftware.kryo', module: 'kryo' > } > implementation "com.google.protobuf:protobuf-java:3.14.0" > implementation 'com.github.javafaker:javafaker:1.0.2' > } > > ``` > > > Any idea what I should try next? > > Thanks in advance! > > > > -- Svend Vanderveken Kelesia SPRL - BE 0839 049 010 blog: https://svend.kelesia.com <http://svend.kelesia.com/> Twitter: @sv3ndk <https://twitter.com/sv3ndk> |
Free forum by Nabble | Edit this page |