Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Bill lee created FLINK-22925:
-------------------------------- Summary: "FieldDescriptor does not match message type" ERROR when use protobuf-router Key: FLINK-22925 URL: https://issues.apache.org/jira/browse/FLINK-22925 Project: Flink Issue Type: Bug Components: Stateful Functions Affects Versions: statefun-3.0.0 Environment: Flink 1.12.2 stateful function 3.0.0 Reporter: Bill lee Attachments: image-2021-06-08-21-22-01-748.png When use protobuf-router in I/O module as follow: {code:java} version: "3.0" module: meta: type: remote spec: endpoints: - endpoint: meta: kind: http spec: functions: dga/* urlPathTemplate: http://cic02:9999/statefun timeouts: call: 2min ingresses: - ingress: meta: type: statefun.kafka.io/protobuf-ingress id: dga/names spec: address: cic02:9092 consumerGroupId: my-group-id topics: - index_events messageType: com.my.protobuf.XxMessage descriptorSet: classpath:stream.desc egresses: - egress: meta: type: io.statefun.kafka/egress id: dga/greets spec: address: cic02:9092 deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 routers: - router: meta: type: org.apache.flink.statefun.sdk/protobuf-router spec: ingress: dga/names target: "dga/person/{{$.src_ip}}" messageType: com.my.protobuf.XxMessage descriptorSet: classpath:stream.desc {code} I got protobuf error : "FieldDescriptor does not match message type" !image-2021-06-08-21-22-01-748.png! And then I make a test like this: {code:java} @Test public void exampleUsage01() throws IOException { Message originalMessage = SimpleMessage.newBuilder().setName("bob").build(); ProtobufDescriptorMap descriptorPath01 = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET); Optional<Descriptors.GenericDescriptor> maybeDescriptor01 = descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage"); Descriptors.Descriptor descriptor = (Descriptors.Descriptor) maybeDescriptor01.get(); DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor); Parser<? extends Message> parser = dynamicMessage.getParserForType(); Message message = parser.parseFrom(originalMessage.toByteArray()); ProtobufDescriptorMap descriptorPath = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET); Optional<Descriptors.GenericDescriptor> maybeDescriptor = descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage"); AddressResolver evaluator = AddressResolver.fromAddressTemplate((Descriptors.Descriptor) maybeDescriptor.get(), "dga/person/{{$.name}}"); Address targetAddress = evaluator.evaluate(message); System.out.println(targetAddress); } {code} also got the same error. I think the cause is that , descriptorSet is defied in both ingress and router, and generated two different Descriptors for the message. Please correct me if I am wrong. And any advise for this problem? Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Disable Popup Ads | Edit this page |