Hi, Based on the example from https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example I am trying to ingest json data in kafka, but unable to achieve based on the examples.
event-generator.py def produce(): request = {} request['id'] = "abc-123" request['field1'] = "field1-1" request['field2'] = "field2-2" request['field3'] = "field3-3" if len(sys.argv) == 2: delay_seconds = int(sys.argv[1]) else: delay_seconds = 1 producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) for request in random_requests_dict(): producer.send(topic='test-topic', value=json.dumps(request).encode('utf-8')) producer.flush() time.sleep(delay_seconds) Below is the proto definition of the json data ( i dont always know all the fields, but i know id fields definitely exists) message.proto message MyRow { string id = 1; google.protobuf.Struct message = 2; } Below is greeter that received the data tokenizer.py ( same like greeter.py saving state of id instead of counting ) @app.route('/statefun', methods=['POST']) def handle(): my_row = MyRow() data = my_row.ParseFromString(request.data) // Is this the right way to do it? response_data = handler(request.data) response = make_response(response_data) response.headers.set('Content-Type', 'application/octet-stream') return response but, below is the error message. I am a newbie with proto and appreciate any help 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app response = self.full_dispatch_request() File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request rv = self.handle_user_exception(e) File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception reraise(exc_type, exc_value, tb) File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise raise value File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request rv = self.dispatch_request() File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/app/tokenizer.py", line 101, in handle response_data = handler(data) File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", line 38, in __call__ request.ParseFromString(request_bytes) File "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line 199, in ParseFromString return self.MergeFromString(serialized) File "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", line 1131, in MergeFromString serialized = memoryview(serialized) TypeError: memoryview: a bytes-like object is required, not 'int' |
Hi,
The values must be valid encoded Protobuf messages [1], while in your attached code snippet you are sending utf-8 encoded JSON strings. You can take a look at this example with a generator that produces Protobuf messages [2][3] [1] https://developers.google.com/protocol-buffers/docs/pythontutorial [2] https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 [3] https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]> wrote: > Hi, Based on the example from > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > I am trying to ingest json data in kafka, but unable to achieve based on > the examples. > > event-generator.py > > def produce(): > request = {} > request['id'] = "abc-123" > request['field1'] = "field1-1" > request['field2'] = "field2-2" > request['field3'] = "field3-3" > if len(sys.argv) == 2: > delay_seconds = int(sys.argv[1]) > else: > delay_seconds = 1 > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > for request in random_requests_dict(): > producer.send(topic='test-topic', > value=json.dumps(request).encode('utf-8')) > producer.flush() > time.sleep(delay_seconds) > > Below is the proto definition of the json data ( i dont always know all > the fields, but i know id fields definitely exists) > message.proto > > message MyRow { > string id = 1; > google.protobuf.Struct message = 2; > } > > Below is greeter that received the data > tokenizer.py ( same like greeter.py saving state of id instead of counting > ) > > > @app.route('/statefun', methods=['POST']) > def handle(): > my_row = MyRow() > data = my_row.ParseFromString(request.data) // Is this the right way > to do it? > response_data = handler(request.data) > response = make_response(response_data) > response.headers.set('Content-Type', 'application/octet-stream') > return response > > > but, below is the error message. I am a newbie with proto and appreciate > any help > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > Traceback (most recent call last): > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > in wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > in full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > in handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > in reraise > raise value > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > in full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > in dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/app/tokenizer.py", line 101, in handle > response_data = handler(data) > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > line 38, in __call__ > request.ParseFromString(request_bytes) > File > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > 199, in ParseFromString > return self.MergeFromString(serialized) > File > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > line 1131, in MergeFromString > serialized = memoryview(serialized) > TypeError: memoryview: a bytes-like object is required, not 'int' > > |
Thanks Igal,
I dont have control over the data source inside kafka ( current kafka topic contains either json or avro formats only, i am trying to reproduce this scenario using my test data generator ). is it possible to convert the json to proto at the receiving end of statefun applicaiton? On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote: > Hi, > > The values must be valid encoded Protobuf messages [1], while in your > attached code snippet you are sending utf-8 encoded JSON strings. > You can take a look at this example with a generator that produces Protobuf > messages [2][3] > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > [2] > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > [3] > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]> > wrote: > > > Hi, Based on the example from > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > I am trying to ingest json data in kafka, but unable to achieve based on > > the examples. > > > > event-generator.py > > > > def produce(): > > request = {} > > request['id'] = "abc-123" > > request['field1'] = "field1-1" > > request['field2'] = "field2-2" > > request['field3'] = "field3-3" > > if len(sys.argv) == 2: > > delay_seconds = int(sys.argv[1]) > > else: > > delay_seconds = 1 > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > for request in random_requests_dict(): > > producer.send(topic='test-topic', > > value=json.dumps(request).encode('utf-8')) > > producer.flush() > > time.sleep(delay_seconds) > > > > Below is the proto definition of the json data ( i dont always know all > > the fields, but i know id fields definitely exists) > > message.proto > > > > message MyRow { > > string id = 1; > > google.protobuf.Struct message = 2; > > } > > > > Below is greeter that received the data > > tokenizer.py ( same like greeter.py saving state of id instead of counting > > ) > > > > > > @app.route('/statefun', methods=['POST']) > > def handle(): > > my_row = MyRow() > > data = my_row.ParseFromString(request.data) // Is this the right way > > to do it? > > response_data = handler(request.data) > > response = make_response(response_data) > > response.headers.set('Content-Type', 'application/octet-stream') > > return response > > > > > > but, below is the error message. I am a newbie with proto and appreciate > > any help > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > Traceback (most recent call last): > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > > in wsgi_app > > response = self.full_dispatch_request() > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > > in full_dispatch_request > > rv = self.handle_user_exception(e) > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > > in handle_user_exception > > reraise(exc_type, exc_value, tb) > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > > in reraise > > raise value > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > > in full_dispatch_request > > rv = self.dispatch_request() > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > > in dispatch_request > > return self.view_functions[rule.endpoint](**req.view_args) > > File "/app/tokenizer.py", line 101, in handle > > response_data = handler(data) > > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > line 38, in __call__ > > request.ParseFromString(request_bytes) > > File > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > > 199, in ParseFromString > > return self.MergeFromString(serialized) > > File > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > line 1131, in MergeFromString > > serialized = memoryview(serialized) > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > > |
checking to see if this is possible currently.
Read json data from kafka topic => process using statefun => write out to kafka in json format. I could have a separate process to read the source json data convert to protobuf into another kafka topic but it sounds in-efficient. e.g. Read json data from kafka topic =>convert json to protobuf => process using statefun => write out to kafka in protobuf format.=> convert protobuf to json message Appreciate any advice on how to process json messages using statefun , also if this is not possible in the current python sdk, can i do that using the java/scala sdk? Thanks. On 2020/06/15 15:34:39, Sunil Sattiraju <[hidden email]> wrote: > Thanks Igal, > I dont have control over the data source inside kafka ( current kafka topic contains either json or avro formats only, i am trying to reproduce this scenario using my test data generator ). > > is it possible to convert the json to proto at the receiving end of statefun applicaiton? > > On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote: > > Hi, > > > > The values must be valid encoded Protobuf messages [1], while in your > > attached code snippet you are sending utf-8 encoded JSON strings. > > You can take a look at this example with a generator that produces Protobuf > > messages [2][3] > > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > > [2] > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > > [3] > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]> > > wrote: > > > > > Hi, Based on the example from > > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > > I am trying to ingest json data in kafka, but unable to achieve based on > > > the examples. > > > > > > event-generator.py > > > > > > def produce(): > > > request = {} > > > request['id'] = "abc-123" > > > request['field1'] = "field1-1" > > > request['field2'] = "field2-2" > > > request['field3'] = "field3-3" > > > if len(sys.argv) == 2: > > > delay_seconds = int(sys.argv[1]) > > > else: > > > delay_seconds = 1 > > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > > for request in random_requests_dict(): > > > producer.send(topic='test-topic', > > > value=json.dumps(request).encode('utf-8')) > > > producer.flush() > > > time.sleep(delay_seconds) > > > > > > Below is the proto definition of the json data ( i dont always know all > > > the fields, but i know id fields definitely exists) > > > message.proto > > > > > > message MyRow { > > > string id = 1; > > > google.protobuf.Struct message = 2; > > > } > > > > > > Below is greeter that received the data > > > tokenizer.py ( same like greeter.py saving state of id instead of counting > > > ) > > > > > > > > > @app.route('/statefun', methods=['POST']) > > > def handle(): > > > my_row = MyRow() > > > data = my_row.ParseFromString(request.data) // Is this the right way > > > to do it? > > > response_data = handler(request.data) > > > response = make_response(response_data) > > > response.headers.set('Content-Type', 'application/octet-stream') > > > return response > > > > > > > > > but, below is the error message. I am a newbie with proto and appreciate > > > any help > > > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > > Traceback (most recent call last): > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, > > > in wsgi_app > > > response = self.full_dispatch_request() > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, > > > in full_dispatch_request > > > rv = self.handle_user_exception(e) > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, > > > in handle_user_exception > > > reraise(exc_type, exc_value, tb) > > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, > > > in reraise > > > raise value > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, > > > in full_dispatch_request > > > rv = self.dispatch_request() > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, > > > in dispatch_request > > > return self.view_functions[rule.endpoint](**req.view_args) > > > File "/app/tokenizer.py", line 101, in handle > > > response_data = handler(data) > > > File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > > line 38, in __call__ > > > request.ParseFromString(request_bytes) > > > File > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line > > > 199, in ParseFromString > > > return self.MergeFromString(serialized) > > > File > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > > line 1131, in MergeFromString > > > serialized = memoryview(serialized) > > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > > > > > > |
(forwarding this to user@ as it is more suited to be located there)
Hi Sunil, With remote functions (using the Python SDK), messages sent to / from them must be Protobuf messages. This is a requirement since remote functions can be written in any language, and we use Protobuf as a means for cross-language messaging. If you are defining Kafka ingresses in a remote module (via textual YAML module configs), then records in the Kafka ingress will be directly routed to the remote functions, and therefore they are required to be Protobuf messages as well. With embedded functions (using the current Java SDK), then what you are trying to do is possible. When using the Java SDK, the Kafka ingress allows providing a `KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka into any type you intend for messaging within the StateFun application. So there, you can convert your JSON records. If you want to still write your main application logic in Python, but the input and output messages in Kafka are required to be JSON, what you can currently do is have a mix of remote module [2] containing the application logic as Python functions, and a separate embedded module [3] containing the Java Kafka ingress and egresses. So, concretely, your 2 modules will contain: Remote module: - Your Python functions implementing the main business logic. Embedded module: - Java Kafka ingress with deserializer that converts JSON to Protobuf messages. Here you have the freedom to extract only the fields that you need. - A Java router [4] that routes those converted messages to the remote functions, by their logical address - A Java Kafka egress with serializer that converts Protobuf messages from remote functions into JSON Kafka records. - A Java function that simply forwards input messages to the Kafka Kafka egress. If the remote functions need to write JSON messages to Kafka, they send a Protobuf message to this function. Hope this helps. Note that the egress side of things can definitely be easier (without the extra forwarding through a Java function) if the Python SDK's `kafka_egress_record` method allows supplying arbitrary bytes. Then you would be able to already write to Kafka JSON messages in the Python functions. This however isn't supported yet, but technically it is quite easy to achieve. I've just filed a issue for this [5], in case you'd like to follow that. Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer [2] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module [3] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module [4] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router [5] https://issues.apache.org/jira/browse/FLINK-18340 On Wed, Jun 17, 2020 at 9:25 AM Sunil <[hidden email]> wrote: > checking to see if this is possible currently. > Read json data from kafka topic => process using statefun => write out to > kafka in json format. > > I could have a separate process to read the source json data convert to > protobuf into another kafka topic but it sounds in-efficient. > e.g. > Read json data from kafka topic =>convert json to protobuf => process > using statefun => write out to kafka in protobuf format.=> convert protobuf > to json message > > Appreciate any advice on how to process json messages using statefun , > also if this is not possible in the current python sdk, can i do that using > the java/scala sdk? > > Thanks. > > On 2020/06/15 15:34:39, Sunil Sattiraju <[hidden email]> wrote: > > Thanks Igal, > > I dont have control over the data source inside kafka ( current kafka > topic contains either json or avro formats only, i am trying to reproduce > this scenario using my test data generator ). > > > > is it possible to convert the json to proto at the receiving end of > statefun applicaiton? > > > > On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote: > > > Hi, > > > > > > The values must be valid encoded Protobuf messages [1], while in your > > > attached code snippet you are sending utf-8 encoded JSON strings. > > > You can take a look at this example with a generator that produces > Protobuf > > > messages [2][3] > > > > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial > > > [2] > > > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37 > > > [3] > > > > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25 > > > > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju < > [hidden email]> > > > wrote: > > > > > > > Hi, Based on the example from > > > > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example > > > > I am trying to ingest json data in kafka, but unable to achieve > based on > > > > the examples. > > > > > > > > event-generator.py > > > > > > > > def produce(): > > > > request = {} > > > > request['id'] = "abc-123" > > > > request['field1'] = "field1-1" > > > > request['field2'] = "field2-2" > > > > request['field3'] = "field3-3" > > > > if len(sys.argv) == 2: > > > > delay_seconds = int(sys.argv[1]) > > > > else: > > > > delay_seconds = 1 > > > > producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER]) > > > > for request in random_requests_dict(): > > > > producer.send(topic='test-topic', > > > > value=json.dumps(request).encode('utf-8')) > > > > producer.flush() > > > > time.sleep(delay_seconds) > > > > > > > > Below is the proto definition of the json data ( i dont always know > all > > > > the fields, but i know id fields definitely exists) > > > > message.proto > > > > > > > > message MyRow { > > > > string id = 1; > > > > google.protobuf.Struct message = 2; > > > > } > > > > > > > > Below is greeter that received the data > > > > tokenizer.py ( same like greeter.py saving state of id instead of > counting > > > > ) > > > > > > > > > > > > @app.route('/statefun', methods=['POST']) > > > > def handle(): > > > > my_row = MyRow() > > > > data = my_row.ParseFromString(request.data) // Is this the right > way > > > > to do it? > > > > response_data = handler(request.data) > > > > response = make_response(response_data) > > > > response.headers.set('Content-Type', 'application/octet-stream') > > > > return response > > > > > > > > > > > > but, below is the error message. I am a newbie with proto and > appreciate > > > > any help > > > > > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST] > > > > Traceback (most recent call last): > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > 2447, > > > > in wsgi_app > > > > response = self.full_dispatch_request() > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > 1952, > > > > in full_dispatch_request > > > > rv = self.handle_user_exception(e) > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > 1821, > > > > in handle_user_exception > > > > reraise(exc_type, exc_value, tb) > > > > File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", > line 39, > > > > in reraise > > > > raise value > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > 1950, > > > > in full_dispatch_request > > > > rv = self.dispatch_request() > > > > File "/usr/local/lib/python3.8/site-packages/flask/app.py", line > 1936, > > > > in dispatch_request > > > > return self.view_functions[rule.endpoint](**req.view_args) > > > > File "/app/tokenizer.py", line 101, in handle > > > > response_data = handler(data) > > > > File > "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", > > > > line 38, in __call__ > > > > request.ParseFromString(request_bytes) > > > > File > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", > line > > > > 199, in ParseFromString > > > > return self.MergeFromString(serialized) > > > > File > > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", > > > > line 1131, in MergeFromString > > > > serialized = memoryview(serialized) > > > > TypeError: memoryview: a bytes-like object is required, not 'int' > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |