Any python example with json data from Kafka using flink-statefun

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Any python example with json data from Kafka using flink-statefun

Sunil Sattiraju
Hi, Based on the example from https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example I am trying to ingest json data in kafka, but unable to achieve based on the examples.

event-generator.py

def produce():
    request = {}
    request['id'] = "abc-123"
    request['field1'] = "field1-1"
    request['field2'] = "field2-2"
    request['field3'] = "field3-3"
    if len(sys.argv) == 2:
        delay_seconds = int(sys.argv[1])
    else:
        delay_seconds = 1
    producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
    for request in random_requests_dict():
        producer.send(topic='test-topic',
                      value=json.dumps(request).encode('utf-8'))
        producer.flush()
        time.sleep(delay_seconds)

Below is the proto definition of the json data ( i dont always know all the fields, but i know id fields definitely exists)
message.proto

message MyRow {
    string id = 1;
    google.protobuf.Struct message = 2;
}

Below is greeter that received the data
tokenizer.py ( same like greeter.py saving state of id instead of counting )


@app.route('/statefun', methods=['POST'])
def handle():
    my_row = MyRow()
    data = my_row.ParseFromString(request.data) // Is this the right way to do it?
    response_data = handler(request.data)
    response = make_response(response_data)
    response.headers.set('Content-Type', 'application/octet-stream')
    return response


but, below is the error message. I am a newbie with proto and appreciate any help

11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/app/tokenizer.py", line 101, in handle
    response_data = handler(data)
  File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py", line 38, in __call__
    request.ParseFromString(request_bytes)
  File "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line 199, in ParseFromString
    return self.MergeFromString(serialized)
  File "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py", line 1131, in MergeFromString
    serialized = memoryview(serialized)
TypeError: memoryview: a bytes-like object is required, not 'int'

Reply | Threaded
Open this post in threaded view
|

Re: Any python example with json data from Kafka using flink-statefun

Igal Shilman
Hi,

The values must be valid encoded Protobuf messages [1], while in your
attached code snippet you are sending utf-8 encoded JSON strings.
You can take a look at this example with a generator that produces Protobuf
messages [2][3]

[1] https://developers.google.com/protocol-buffers/docs/pythontutorial
[2]
https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
[3]
https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25

On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]>
wrote:

> Hi, Based on the example from
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> I am trying to ingest json data in kafka, but unable to achieve based on
> the examples.
>
> event-generator.py
>
> def produce():
>     request = {}
>     request['id'] = "abc-123"
>     request['field1'] = "field1-1"
>     request['field2'] = "field2-2"
>     request['field3'] = "field3-3"
>     if len(sys.argv) == 2:
>         delay_seconds = int(sys.argv[1])
>     else:
>         delay_seconds = 1
>     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
>     for request in random_requests_dict():
>         producer.send(topic='test-topic',
>                       value=json.dumps(request).encode('utf-8'))
>         producer.flush()
>         time.sleep(delay_seconds)
>
> Below is the proto definition of the json data ( i dont always know all
> the fields, but i know id fields definitely exists)
> message.proto
>
> message MyRow {
>     string id = 1;
>     google.protobuf.Struct message = 2;
> }
>
> Below is greeter that received the data
> tokenizer.py ( same like greeter.py saving state of id instead of counting
> )
>
>
> @app.route('/statefun', methods=['POST'])
> def handle():
>     my_row = MyRow()
>     data = my_row.ParseFromString(request.data) // Is this the right way
> to do it?
>     response_data = handler(request.data)
>     response = make_response(response_data)
>     response.headers.set('Content-Type', 'application/octet-stream')
>     return response
>
>
> but, below is the error message. I am a newbie with proto and appreciate
> any help
>
> 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> in wsgi_app
>     response = self.full_dispatch_request()
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> in reraise
>     raise value
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> in full_dispatch_request
>     rv = self.dispatch_request()
>   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/app/tokenizer.py", line 101, in handle
>     response_data = handler(data)
>   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> line 38, in __call__
>     request.ParseFromString(request_bytes)
>   File
> "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> 199, in ParseFromString
>     return self.MergeFromString(serialized)
>   File
> "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> line 1131, in MergeFromString
>     serialized = memoryview(serialized)
> TypeError: memoryview: a bytes-like object is required, not 'int'
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Any python example with json data from Kafka using flink-statefun

Sunil Sattiraju
Thanks Igal,
I dont have control over the data source inside kafka ( current kafka topic contains either json or avro formats only, i am trying to reproduce this scenario using my test data generator ).

is it possible to convert the json to proto at the receiving end of statefun applicaiton?

On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote:

> Hi,
>
> The values must be valid encoded Protobuf messages [1], while in your
> attached code snippet you are sending utf-8 encoded JSON strings.
> You can take a look at this example with a generator that produces Protobuf
> messages [2][3]
>
> [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> [2]
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> [3]
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
>
> On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]>
> wrote:
>
> > Hi, Based on the example from
> > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > I am trying to ingest json data in kafka, but unable to achieve based on
> > the examples.
> >
> > event-generator.py
> >
> > def produce():
> >     request = {}
> >     request['id'] = "abc-123"
> >     request['field1'] = "field1-1"
> >     request['field2'] = "field2-2"
> >     request['field3'] = "field3-3"
> >     if len(sys.argv) == 2:
> >         delay_seconds = int(sys.argv[1])
> >     else:
> >         delay_seconds = 1
> >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> >     for request in random_requests_dict():
> >         producer.send(topic='test-topic',
> >                       value=json.dumps(request).encode('utf-8'))
> >         producer.flush()
> >         time.sleep(delay_seconds)
> >
> > Below is the proto definition of the json data ( i dont always know all
> > the fields, but i know id fields definitely exists)
> > message.proto
> >
> > message MyRow {
> >     string id = 1;
> >     google.protobuf.Struct message = 2;
> > }
> >
> > Below is greeter that received the data
> > tokenizer.py ( same like greeter.py saving state of id instead of counting
> > )
> >
> >
> > @app.route('/statefun', methods=['POST'])
> > def handle():
> >     my_row = MyRow()
> >     data = my_row.ParseFromString(request.data) // Is this the right way
> > to do it?
> >     response_data = handler(request.data)
> >     response = make_response(response_data)
> >     response.headers.set('Content-Type', 'application/octet-stream')
> >     return response
> >
> >
> > but, below is the error message. I am a newbie with proto and appreciate
> > any help
> >
> > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > Traceback (most recent call last):
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> > in wsgi_app
> >     response = self.full_dispatch_request()
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> > in full_dispatch_request
> >     rv = self.handle_user_exception(e)
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> > in handle_user_exception
> >     reraise(exc_type, exc_value, tb)
> >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> > in reraise
> >     raise value
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> > in full_dispatch_request
> >     rv = self.dispatch_request()
> >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> > in dispatch_request
> >     return self.view_functions[rule.endpoint](**req.view_args)
> >   File "/app/tokenizer.py", line 101, in handle
> >     response_data = handler(data)
> >   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > line 38, in __call__
> >     request.ParseFromString(request_bytes)
> >   File
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> > 199, in ParseFromString
> >     return self.MergeFromString(serialized)
> >   File
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > line 1131, in MergeFromString
> >     serialized = memoryview(serialized)
> > TypeError: memoryview: a bytes-like object is required, not 'int'
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Any python example with json data from Kafka using flink-statefun

Sunil Sattiraju
checking to see if this is possible currently.
Read json data from kafka topic => process using statefun => write out to kafka in json format.

I could have a separate process to read the source json data convert to protobuf into another kafka topic but it sounds in-efficient.
e.g.
Read json data from kafka topic =>convert json to protobuf =>  process using statefun => write out to kafka in protobuf format.=> convert protobuf to json message

Appreciate any advice on how to process json messages using statefun , also if this is not possible in the current python sdk, can i do that using the java/scala sdk?

Thanks.

On 2020/06/15 15:34:39, Sunil Sattiraju <[hidden email]> wrote:

> Thanks Igal,
> I dont have control over the data source inside kafka ( current kafka topic contains either json or avro formats only, i am trying to reproduce this scenario using my test data generator ).
>
> is it possible to convert the json to proto at the receiving end of statefun applicaiton?
>
> On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote:
> > Hi,
> >
> > The values must be valid encoded Protobuf messages [1], while in your
> > attached code snippet you are sending utf-8 encoded JSON strings.
> > You can take a look at this example with a generator that produces Protobuf
> > messages [2][3]
> >
> > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > [2]
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > [3]
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> >
> > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <[hidden email]>
> > wrote:
> >
> > > Hi, Based on the example from
> > > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > I am trying to ingest json data in kafka, but unable to achieve based on
> > > the examples.
> > >
> > > event-generator.py
> > >
> > > def produce():
> > >     request = {}
> > >     request['id'] = "abc-123"
> > >     request['field1'] = "field1-1"
> > >     request['field2'] = "field2-2"
> > >     request['field3'] = "field3-3"
> > >     if len(sys.argv) == 2:
> > >         delay_seconds = int(sys.argv[1])
> > >     else:
> > >         delay_seconds = 1
> > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > >     for request in random_requests_dict():
> > >         producer.send(topic='test-topic',
> > >                       value=json.dumps(request).encode('utf-8'))
> > >         producer.flush()
> > >         time.sleep(delay_seconds)
> > >
> > > Below is the proto definition of the json data ( i dont always know all
> > > the fields, but i know id fields definitely exists)
> > > message.proto
> > >
> > > message MyRow {
> > >     string id = 1;
> > >     google.protobuf.Struct message = 2;
> > > }
> > >
> > > Below is greeter that received the data
> > > tokenizer.py ( same like greeter.py saving state of id instead of counting
> > > )
> > >
> > >
> > > @app.route('/statefun', methods=['POST'])
> > > def handle():
> > >     my_row = MyRow()
> > >     data = my_row.ParseFromString(request.data) // Is this the right way
> > > to do it?
> > >     response_data = handler(request.data)
> > >     response = make_response(response_data)
> > >     response.headers.set('Content-Type', 'application/octet-stream')
> > >     return response
> > >
> > >
> > > but, below is the error message. I am a newbie with proto and appreciate
> > > any help
> > >
> > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > Traceback (most recent call last):
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2447,
> > > in wsgi_app
> > >     response = self.full_dispatch_request()
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1952,
> > > in full_dispatch_request
> > >     rv = self.handle_user_exception(e)
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1821,
> > > in handle_user_exception
> > >     reraise(exc_type, exc_value, tb)
> > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39,
> > > in reraise
> > >     raise value
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1950,
> > > in full_dispatch_request
> > >     rv = self.dispatch_request()
> > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1936,
> > > in dispatch_request
> > >     return self.view_functions[rule.endpoint](**req.view_args)
> > >   File "/app/tokenizer.py", line 101, in handle
> > >     response_data = handler(data)
> > >   File "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > line 38, in __call__
> > >     request.ParseFromString(request_bytes)
> > >   File
> > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py", line
> > > 199, in ParseFromString
> > >     return self.MergeFromString(serialized)
> > >   File
> > > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > line 1131, in MergeFromString
> > >     serialized = memoryview(serialized)
> > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Any python example with json data from Kafka using flink-statefun

Tzu-Li (Gordon) Tai
(forwarding this to user@ as it is more suited to be located there)

Hi Sunil,

With remote functions (using the Python SDK), messages sent to / from them
must be Protobuf messages.
This is a requirement since remote functions can be written in any
language, and we use Protobuf as a means for cross-language messaging.
If you are defining Kafka ingresses in a remote module (via textual YAML
module configs), then records in the Kafka ingress will be directly routed
to the remote functions, and therefore they are required to be Protobuf
messages as well.

With embedded functions (using the current Java SDK), then what you are
trying to do is possible.
When using the Java SDK, the Kafka ingress allows providing a
`KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
into any type you intend for messaging within the StateFun application. So
there, you can convert your JSON records.

If you want to still write your main application logic in Python, but the
input and output messages in Kafka are required to be JSON,
what you can currently do is have a mix of remote module [2] containing the
application logic as Python functions,
and a separate embedded module [3] containing the Java Kafka ingress and
egresses.
So, concretely, your 2 modules will contain:

Remote module:
- Your Python functions implementing the main business logic.

Embedded module:
- Java Kafka ingress with deserializer that converts JSON to Protobuf
messages. Here you have the freedom to extract only the fields that you
need.
- A Java router [4] that routes those converted messages to the remote
functions, by their logical address
- A Java Kafka egress with serializer that converts Protobuf messages from
remote functions into JSON Kafka records.
- A Java function that simply forwards input messages to the Kafka Kafka
egress. If the remote functions need to write JSON messages to Kafka, they
send a Protobuf message to this function.


Hope this helps.
Note that the egress side of things can definitely be easier (without the
extra forwarding through a Java function) if the Python SDK's
`kafka_egress_record` method allows supplying arbitrary bytes.
Then you would be able to already write to Kafka JSON messages in the
Python functions.
This however isn't supported yet, but technically it is quite easy to
achieve. I've just filed a issue for this [5], in case you'd like to follow
that.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module

[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
[4]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
[5] https://issues.apache.org/jira/browse/FLINK-18340

On Wed, Jun 17, 2020 at 9:25 AM Sunil <[hidden email]> wrote:

> checking to see if this is possible currently.
> Read json data from kafka topic => process using statefun => write out to
> kafka in json format.
>
> I could have a separate process to read the source json data convert to
> protobuf into another kafka topic but it sounds in-efficient.
> e.g.
> Read json data from kafka topic =>convert json to protobuf =>  process
> using statefun => write out to kafka in protobuf format.=> convert protobuf
> to json message
>
> Appreciate any advice on how to process json messages using statefun ,
> also if this is not possible in the current python sdk, can i do that using
> the java/scala sdk?
>
> Thanks.
>
> On 2020/06/15 15:34:39, Sunil Sattiraju <[hidden email]> wrote:
> > Thanks Igal,
> > I dont have control over the data source inside kafka ( current kafka
> topic contains either json or avro formats only, i am trying to reproduce
> this scenario using my test data generator ).
> >
> > is it possible to convert the json to proto at the receiving end of
> statefun applicaiton?
> >
> > On 2020/06/15 14:51:01, Igal Shilman <[hidden email]> wrote:
> > > Hi,
> > >
> > > The values must be valid encoded Protobuf messages [1], while in your
> > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > You can take a look at this example with a generator that produces
> Protobuf
> > > messages [2][3]
> > >
> > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > [2]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > [3]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > >
> > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> [hidden email]>
> > > wrote:
> > >
> > > > Hi, Based on the example from
> > > >
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > > I am trying to ingest json data in kafka, but unable to achieve
> based on
> > > > the examples.
> > > >
> > > > event-generator.py
> > > >
> > > > def produce():
> > > >     request = {}
> > > >     request['id'] = "abc-123"
> > > >     request['field1'] = "field1-1"
> > > >     request['field2'] = "field2-2"
> > > >     request['field3'] = "field3-3"
> > > >     if len(sys.argv) == 2:
> > > >         delay_seconds = int(sys.argv[1])
> > > >     else:
> > > >         delay_seconds = 1
> > > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > > >     for request in random_requests_dict():
> > > >         producer.send(topic='test-topic',
> > > >                       value=json.dumps(request).encode('utf-8'))
> > > >         producer.flush()
> > > >         time.sleep(delay_seconds)
> > > >
> > > > Below is the proto definition of the json data ( i dont always know
> all
> > > > the fields, but i know id fields definitely exists)
> > > > message.proto
> > > >
> > > > message MyRow {
> > > >     string id = 1;
> > > >     google.protobuf.Struct message = 2;
> > > > }
> > > >
> > > > Below is greeter that received the data
> > > > tokenizer.py ( same like greeter.py saving state of id instead of
> counting
> > > > )
> > > >
> > > >
> > > > @app.route('/statefun', methods=['POST'])
> > > > def handle():
> > > >     my_row = MyRow()
> > > >     data = my_row.ParseFromString(request.data) // Is this the right
> way
> > > > to do it?
> > > >     response_data = handler(request.data)
> > > >     response = make_response(response_data)
> > > >     response.headers.set('Content-Type', 'application/octet-stream')
> > > >     return response
> > > >
> > > >
> > > > but, below is the error message. I am a newbie with proto and
> appreciate
> > > > any help
> > > >
> > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > > Traceback (most recent call last):
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 2447,
> > > > in wsgi_app
> > > >     response = self.full_dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1952,
> > > > in full_dispatch_request
> > > >     rv = self.handle_user_exception(e)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1821,
> > > > in handle_user_exception
> > > >     reraise(exc_type, exc_value, tb)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py",
> line 39,
> > > > in reraise
> > > >     raise value
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1950,
> > > > in full_dispatch_request
> > > >     rv = self.dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1936,
> > > > in dispatch_request
> > > >     return self.view_functions[rule.endpoint](**req.view_args)
> > > >   File "/app/tokenizer.py", line 101, in handle
> > > >     response_data = handler(data)
> > > >   File
> "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > > line 38, in __call__
> > > >     request.ParseFromString(request_bytes)
> > > >   File
> > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py",
> line
> > > > 199, in ParseFromString
> > > >     return self.MergeFromString(serialized)
> > > >   File
> > > >
> "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > > line 1131, in MergeFromString
> > > >     serialized = memoryview(serialized)
> > > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > > >
> > > >
> > >
> >
>