[jira] [Created] (FLINK-18518) Add Async RequestReply handler for the Python SDK

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18518) Add Async RequestReply handler for the Python SDK

Shang Yuanchun (Jira)
Igal Shilman created FLINK-18518:
------------------------------------

             Summary: Add Async RequestReply handler for the Python SDK
                 Key: FLINK-18518
                 URL: https://issues.apache.org/jira/browse/FLINK-18518
             Project: Flink
          Issue Type: Improvement
          Components: Stateful Functions
    Affects Versions: statefun-2.1.0
            Reporter: Igal Shilman


I/O bound stateful functions can benefit from the built-in async/io support in Python, but the 

RequestReply handler is not an async-io compatible.  See [this|https://stackoverflow.com/questions/62640283/flink-stateful-functions-async-calls-with-the-python-sdk] question on stackoverflow.

 

Having an asyncio compatible handler will open the door to the usage of aiohttp for example:

 
{code:java}
import aiohttp
import asyncio

...

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

@function.bind("example/hello")
async def hello(context, message):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        context.pack_and_reply(SomeProtobufMessage(html))


from aiohttp import webhandler

handler = AsyncRequestReplyHandler(functions)

async def handle(request):
    req = await request.read()
    res = await handler(req)
    return web.Response(body=res, content_type="application/octet-stream'")

app = web.Application()
app.add_routes([web.post('/statefun', handle)])
if __name__ == '__main__':
    web.run_app(app, port=5000)
 {code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)