Posted by
Shang Yuanchun (Jira) on
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/jira-Created-FLINK-22925-FieldDescriptor-does-not-match-message-type-ERROR-when-use-protobuf-router-tp51232.html
Bill lee created FLINK-22925:
--------------------------------
Summary: "FieldDescriptor does not match message type" ERROR when use protobuf-router
Key: FLINK-22925
URL:
https://issues.apache.org/jira/browse/FLINK-22925 Project: Flink
Issue Type: Bug
Components: Stateful Functions
Affects Versions: statefun-3.0.0
Environment: Flink 1.12.2
stateful function 3.0.0
Reporter: Bill lee
Attachments: image-2021-06-08-21-22-01-748.png
When use protobuf-router in I/O module as follow:
{code:java}
version: "3.0"
module:
meta:
type: remote
spec:
endpoints:
- endpoint:
meta:
kind: http
spec:
functions: dga/*
urlPathTemplate:
http://cic02:9999/statefun timeouts:
call: 2min
ingresses:
- ingress:
meta:
type: statefun.kafka.io/protobuf-ingress
id: dga/names
spec:
address: cic02:9092
consumerGroupId: my-group-id
topics:
- index_events
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
egresses:
- egress:
meta:
type: io.statefun.kafka/egress
id: dga/greets
spec:
address: cic02:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
routers:
- router:
meta:
type: org.apache.flink.statefun.sdk/protobuf-router
spec:
ingress: dga/names
target: "dga/person/{{$.src_ip}}"
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
{code}
I got protobuf error : "FieldDescriptor does not match message type"
!image-2021-06-08-21-22-01-748.png!
And then I make a test like this:
{code:java}
@Test
public void exampleUsage01() throws IOException {
Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
ProtobufDescriptorMap descriptorPath01 = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
Optional<Descriptors.GenericDescriptor> maybeDescriptor01 =
descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
Descriptors.Descriptor descriptor = (Descriptors.Descriptor) maybeDescriptor01.get();
DynamicMessage dynamicMessage = DynamicMessage.getDefaultInstance(descriptor);
Parser<? extends Message> parser = dynamicMessage.getParserForType();
Message message = parser.parseFrom(originalMessage.toByteArray());
ProtobufDescriptorMap descriptorPath = ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET); Optional<Descriptors.GenericDescriptor> maybeDescriptor =
descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
AddressResolver evaluator = AddressResolver.fromAddressTemplate((Descriptors.Descriptor) maybeDescriptor.get(), "dga/person/{{$.name}}");
Address targetAddress = evaluator.evaluate(message);
System.out.println(targetAddress);
}
{code}
also got the same error.
I think the cause is that , descriptorSet is defied in both ingress and router, and generated two different Descriptors for the message.
Please correct me if I am wrong.
And any advise for this problem? Thanks a lot.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)