Login  Register

[jira] [Created] (FLINK-22925) "FieldDescriptor does not match message type" ERROR when use protobuf-router

classic Classic list List threaded Threaded
1 message Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

[jira] [Created] (FLINK-22925) "FieldDescriptor does not match message type" ERROR when use protobuf-router

Shang Yuanchun (Jira)
Bill lee created FLINK-22925:
--------------------------------

             Summary: "FieldDescriptor does not match message type" ERROR  when use protobuf-router
                 Key: FLINK-22925
                 URL: https://issues.apache.org/jira/browse/FLINK-22925
             Project: Flink
          Issue Type: Bug
          Components: Stateful Functions
    Affects Versions: statefun-3.0.0
         Environment: Flink 1.12.2

stateful function 3.0.0
            Reporter: Bill lee
         Attachments: image-2021-06-08-21-22-01-748.png

When use protobuf-router in I/O module as follow: 
{code:java}
version: "3.0"
module:
  meta:
    type: remote
  spec:
    endpoints:
      - endpoint:
          meta:
            kind: http
          spec:
            functions: dga/*
            urlPathTemplate: http://cic02:9999/statefun
            timeouts:
              call: 2min
    ingresses:
      - ingress:
          meta:
            type: statefun.kafka.io/protobuf-ingress
            id: dga/names
          spec:
            address: cic02:9092
            consumerGroupId: my-group-id
            topics:
              - index_events
            messageType: com.my.protobuf.XxMessage
            descriptorSet: classpath:stream.desc    
    egresses:
      - egress:
          meta:
            type: io.statefun.kafka/egress
            id: dga/greets
          spec:
            address: cic02:9092
            deliverySemantic:
              type: exactly-once
              transactionTimeoutMillis: 100000    
    routers:
      - router:
          meta:
            type: org.apache.flink.statefun.sdk/protobuf-router
          spec:
            ingress: dga/names
            target: "dga/person/{{$.src_ip}}"
            messageType: com.my.protobuf.XxMessage
            descriptorSet: classpath:stream.desc
{code}
I got  protobuf error : "FieldDescriptor does not match message type"

!image-2021-06-08-21-22-01-748.png!

And then I make a test like this:
{code:java}
  @Test
  public void exampleUsage01() throws IOException {
    Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
    ProtobufDescriptorMap descriptorPath01 = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
    Optional<Descriptors.GenericDescriptor> maybeDescriptor01 =
            descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
    Descriptors.Descriptor descriptor = (Descriptors.Descriptor) maybeDescriptor01.get();
    DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
    Parser<? extends Message> parser = dynamicMessage.getParserForType();
    Message message = parser.parseFrom(originalMessage.toByteArray());
    ProtobufDescriptorMap descriptorPath = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);    Optional<Descriptors.GenericDescriptor> maybeDescriptor =
            descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
    AddressResolver evaluator = AddressResolver.fromAddressTemplate((Descriptors.Descriptor) maybeDescriptor.get(), "dga/person/{{$.name}}");
    Address targetAddress = evaluator.evaluate(message);
    System.out.println(targetAddress);
  }
{code}
also got the same error.

I think the cause is that , descriptorSet is defied in both ingress and router, and generated two different Descriptors for the message.

Please correct me if I am wrong.  

And any advise for this problem?  Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)