Hello,
I just started learning Flink (using Scala) recently, and I developed a job that, in short, does this steps: - Reads json messages from Kafka - Enriches the messages, reading data from Cassandra (using Phantom DSL) - Puts the enriched messages back to another Kafka topic. The job looks like this: env .addSource(new FlinkKafkaProducer09[String](...)) .map(MyService.enrichMessage _) // Returns Option .filter(!_.isEmpty) .map(_.get) .map(enrichedMessageToJsonMapper) .addSink(new FlinkKafkaConsumer09[String](...))) The "enrichMessage" method is where I'm using Phantom DSL to query Cassandra, and I would like to return a Future, but I can't figure out a way to do it right now, so I'm using "Await" to force the resolution and return a result. Is there a way to use a Future here? I do have a second job that is updating the data in Cassandra, and since I don't need to sink, I can have my map to return the Futures, and everything happens asynchronously. I would like to know if it's possible to have a similar behavior when I want to use a Sink (so, sink to Kafka as the Futures are completed). BTW, I'm using Flink 1.1.2 with Scala 2.11. Thanks a lot for your help! Kind regards, Albert --- This email has been checked for viruses by Avast antivirus software. https://www.avast.com/antivirus |
Hi Albert,
you cannot use Futures between operators as objects are seralialized and possibly sent through the cluster immediatly. Right now there is no straight forward way in Flink to do async calls. However, there is a discussion going on which you might wanna join [1]. As far as I know, the current solution is to create a FlatMap function manually which manages the async calls and emits [2]. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-td13497.html [2] http://stackoverflow.com/questions/38866078/how-to-look-up-and-update-the-state-of-a-record-from-a-database-in-apache-flink I hope that helps. Timo Am 16/09/16 um 13:16 schrieb Albert Gimenez: > Hello, > > > > I just started learning Flink (using Scala) recently, and I developed a job > that, in short, does this steps: > > > > - Reads json messages from Kafka > > - Enriches the messages, reading data from Cassandra (using Phantom > DSL) > > - Puts the enriched messages back to another Kafka topic. > > > > The job looks like this: > > > > env > > .addSource(new FlinkKafkaProducer09[String](...)) > > .map(MyService.enrichMessage _) // Returns Option > > .filter(!_.isEmpty) > > .map(_.get) > > .map(enrichedMessageToJsonMapper) > > .addSink(new FlinkKafkaConsumer09[String](...))) > > > > The "enrichMessage" method is where I'm using Phantom DSL to query > Cassandra, and I would like to return a Future, but I can't figure out a way > to do it right now, so I'm using "Await" to force the resolution and return > a result. Is there a way to use a Future here? > > > > I do have a second job that is updating the data in Cassandra, and since I > don't need to sink, I can have my map to return the Futures, and everything > happens asynchronously. I would like to know if it's possible to have a > similar behavior when I want to use a Sink (so, sink to Kafka as the Futures > are completed). > > > > BTW, I'm using Flink 1.1.2 with Scala 2.11. > > > > Thanks a lot for your help! > > > > Kind regards, > > > > Albert > > > > --- > This email has been checked for viruses by Avast antivirus software. > https://www.avast.com/antivirus > -- Freundliche Grüße / Kind Regards Timo Walther Follow me: @twalthr https://www.linkedin.com/in/twalthr |
Hi Timo,
Thank you for the fast reply :) I will check and follow the discussion, it would be really cool to be able to have that feature. As for the updates, there I'm using Futures which update accumulators on success / failure, and everything seems to be working fine, I tested both in local mode and deploying in YARN with multiple task managers, although they were all running on the same machine. But this update job has no sinks, maybe that's why it's working. Thanks again! Cheers, Albert |
Hi,Albert
I think that the futures may be lost if the machine is down for some reasons. So simply using the futures may not be acceptable in production. I hope this is helpful. Thanks > 在 2016年9月16日,下午3:01,alghimo <[hidden email]> 写道: > > Hi Timo, > > Thank you for the fast reply :) I will check and follow the discussion, it > would be really cool to be able to have that feature. > > As for the updates, there I'm using Futures which update accumulators on > success / failure, and everything seems to be working fine, I tested both in > local mode and deploying in YARN with multiple task managers, although they > were all running on the same machine. But this update job has no sinks, > maybe that's why it's working. > > Thanks again! > > Cheers, > > Albert > > > > -- > View this message in context: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DataStream-of-Future-tp13536p13539.html > Sent from the Apache Flink Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |