Zixuan Rao created FLINK-21211:
---------------------------------- Summary: Looking for reviews on a framework based on flink-statefun Key: FLINK-21211 URL: https://issues.apache.org/jira/browse/FLINK-21211 Project: Flink Issue Type: New Feature Reporter: Zixuan Rao Hi, I am currently developing a framework targeting back end state management. To ensure exactly-once processing of events in back end, I intend to use Flink Stateful Functions runtime in combination with Python's asyncio. I hope to receive some feedbacks. The following code shows an example (draft) of writing a back end micro service using my framework. It is intended to be equivalent (exchangeable) with Flink-stateful examples/ridesharing. The idea is that "Event" is reducible to an async function call, and external egress can be emitted by saving an object. This preserves the exactly-once features of Flink-statefun while adding a great deal of readability to the code. Reviews are appreciated. Thank you! ```python3 """ Equivalent implementation for flink stateful functions example - ridesharing ref: https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/FnDriver.java """ from onto.models.base import Serializable """ Rewrite callback-style code to async-await: ref: https://www.coreycleary.me/how-to-rewrite-a-callback-function-in-promise-form-and-async-await-form-in-javascript """ from onto.domain_model import DomainModel from onto.attrs import attrs class RideshareBase(DomainModel): pass class Passenger(RideshareBase): async def request_ride(self, start_geo_cell, end_geo_cell): r = Ride.create() # TODO: implement create await r.passenger_joins( passenger=self, start_geo_cell=start_geo_cell, end_geo_cell=end_geo_cell ) class PassengerMessage(DomainModel): passenger = attrs.relation('Passenger') class RideFailedMessage(Serializable): ride = attrs.relation('Ride') ride_failed = attrs.embed(RideFailedMessage).optional class DriverHasBeenFoundMessage(Serializable): driver = attrs.relation('Driver') driver_geo_cell = attrs.relation('GeoCell') driver_found = attrs.embed(RideFailedMessage).optional class RideHasStarted(Serializable): driver = attrs.relation('Driver') ride_started = attrs.embed(RideHasStarted).optional class RideHasEnded(Serializable): pass # TODO: make sure that empty class works ride_ended = attrs.embed(RideHasEnded).optional async def ride_failed(self, ride: 'Ride'): message = self.PassengerMessage.new( passenger=self, ride_failed=self.PassengerMessage.RideFailedMessage.new( ride=ride ) ) message.save() async def driver_joins_ride(self, driver: 'Driver', driver_geo_cell: 'GeoCell'): message = self.PassengerMessage.new( passenger=self, driver_found=self.PassengerMessage.DriverHasBeenFoundMessage.new( driver=driver, driver_geo_cell=driver_geo_cell ) ) message.save() async def ride_started(self, driver: 'Driver'): message = self.PassengerMessage.new( passenger=self, ride_started=self.PassengerMessage.RideHasStarted.new( driver=driver ) ) message.save() async def ride_ended(self): message = self.PassengerMessage.new( passenger=self, ride_started=self.PassengerMessage.RideHasEnded.new() ) message.save() class DriverRejectsPickupError(RideshareBase, Exception): driver = attrs.relation(dm_cls='Driver') ride = attrs.relation(dm_cls='Ride') class Driver(RideshareBase): is_taken: bool = attrs.required current_ride = attrs.relation(dm_cls='Ride').optional current_location: 'GeoCell' = attrs.relation(dm_cls='GeoCell') @is_taken.getter def is_taken(self): # TODO: make better return self.current_ride is not None async def pickup_passenger(self, ride: 'Ride', passenger: Passenger, passenger_start_cell: 'GeoCell', passenger_end_cell: 'GeoCell'): if self.is_taken: raise DriverRejectsPickupError(driver=self, ride=ride) self.current_ride = ride # " // We also need to unregister ourselves from the current geo cell we belong to." if geo_cell := self.current_location: await geo_cell.leave_cell(driver=self) await ride.driver_joins(driver=self, driver_location=self.current_location) message = self.DriverMessage.new( driver=self, pickup_passenger=self.DriverMessage.PickupPassengerMessage.new( passenger=passenger, start_geo_location=passenger_start_cell, end_geo_location=passenger_end_cell ) ) message.save() class DriverMessage(RideshareBase): driver = attrs.relation('Driver') class PickupPassengerMessage(Serializable): ride = attrs.relation('Ride') # TODO: maybe passenger_id start_geo_location = attrs.relation('GeoCell') end_geo_location = attrs.relation('GeoCell') pickup_passenger = attrs.embed(PickupPassengerMessage) async def ride_has_started(self): await self.current_ride.ride_started(driver=self, driver_geo_cell=self.current_location) async def ride_has_ended(self): await self.current_ride.ride_ended() async def location_is_updated(self, current_geo_cell: 'GeoCell'): # TODO: maybe switch to embed: final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell(); updated = current_geo_cell last = self.current_location if last is None: self.current_location = updated await updated.join_cell() return elif last == updated: return else: self.current_location = updated class Ride(RideshareBase): passenger = attrs.relation(dm_cls=Passenger) driver = attrs.relation(dm_cls=Driver) async def ride_started(self, driver: Driver, driver_geo_cell: 'GeoCell'): await self.passenger.ride_started(driver=driver) async def passenger_joins( self, passenger: Passenger, start_geo_cell: 'GeoCell', end_geo_cell: 'GeoCell' ): self.passenger = passenger MAX_RETRY = 5 # Ref: https://stackoverflow.com/a/7663441 for trial in range(MAX_RETRY): if driver := start_geo_cell.get_driver(): try: await driver.pickup_passenger( ride=self, passenger=passenger, passenger_start_cell=start_geo_cell, passenger_end_cell=end_geo_cell ) except DriverRejectsPickupError as _: # TODO: NOTE difference from java impl """ final int startGeoCell = passenger.get().getStartGeoCell(); String cellKey = String.valueOf(startGeoCell); context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance()); """ continue # to retry else: break else: await passenger.ride_failed(ride=self) async def driver_joins(self, driver, driver_location): self.driver = driver await self.passenger.driver_joins_ride(driver=driver, driver_geo_cell=driver_location) async def ride_ended(self, ): await self.passenger.ride_ended() self.passenger = None self.driver = None class GeoCell(RideshareBase): drivers: list = attrs.list( value=attrs.relation(dm_cls=Driver) ) async def get_driver(self) -> Driver: if len(self.drivers) != 0: next_driver = self.drivers[0] return next_driver else: return None async def leave_cell(self, driver: Driver): self.drivers.remove(driver) async def add_driver(self): # TODO: mutated local variable vs mutated instance state; # may cause difference in behavior if self.drivers is None: self.drivers = list() self.drivers.append(Driver) join_cell = add_driver ``` -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |