Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part of the this pipeline I have multiple stages in my run command and I would like to publish some substages output into separate kafka topic. My question is can I write multiple stages of run to multiple kafka topics ? private val env = StreamExecutionEnvironment.getExecutionEnvironment private val src = env.addSource(Source.kafka(streams.abc.topic)) override def run(stream: DataStream[TypeX]) : = { val stage1 = stream .map(doA) .map(doB) .map(doC) val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: TypeT)* val stage3 = stage2.filter(_.isTrue) *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method ?* val stage4 = stage2.filter(! _.isTrue) stage4.map(_.toString) } run(src).addSink(Write_To_Kafka_Topic_X) Ideally I will not prefer to call addSink method inside run (as mentioned in bold lines above). -- Thanks, Deepak Jha |
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
Hi,
yes you can output the stages to several different Kafka Topics. If you don't want to call addSink inside the run() method you somehow have to return the handle to your stage3 DataStream, for example: private val env = StreamExecutionEnvironment.getExecutionEnvironment private val src = env.addSource(Source.kafka(streams.abc.topic)) override def run(stream: DataStream[TypeX]) : = { val stage1 = stream .map(doA) .map(doB) .map(doC) val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: TypeT)* val stage3 = stage2.filter(_.isTrue) val stage4 = stage2.filter(! _.isTrue) (stage3, stage4.map(_.toString)) // return both stages } val (stage3, stage4) = run(src) stage3.addSink(Write_To_Kafka_Topic_Y) stage4.addSink(Write_To_Kafka_Topic_X) On Wed, 30 Mar 2016 at 20:19 Deepak Jha <[hidden email]> wrote: > Hi, > I'm building a pipeline using Flink using Kafka as source and sink. As part > of the this pipeline I have multiple stages in my run command and I would > like to publish some substages output into separate kafka topic. > My question is can I write multiple stages of run to multiple kafka topics > ? > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > override def run(stream: DataStream[TypeX]) : = { > > val stage1 = stream > .map(doA) > .map(doB) > .map(doC) > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: > TypeT)* > > val stage3 = stage2.filter(_.isTrue) > *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method > ?* > val stage4 = stage2.filter(! _.isTrue) > > stage4.map(_.toString) > } > > run(src).addSink(Write_To_Kafka_Topic_X) > > > Ideally I will not prefer to call addSink method inside run (as mentioned > in bold lines above). > -- > Thanks, > Deepak Jha > ... [show rest of quote]
|
Loading... |
Reply to author |
Edit post |
Move post |
Delete this post |
Delete this post and replies |
Change post date |
Print post |
Permalink |
Raw mail |
It works... Thanks
On Thu, Mar 31, 2016 at 2:23 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > yes you can output the stages to several different Kafka Topics. If you > don't want to call addSink inside the run() method you somehow have to > return the handle to your stage3 DataStream, for example: > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > override def run(stream: DataStream[TypeX]) : = { > > val stage1 = stream > .map(doA) > .map(doB) > .map(doC) > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse: > TypeT)* > > val stage3 = stage2.filter(_.isTrue) > val stage4 = stage2.filter(! _.isTrue) > > (stage3, stage4.map(_.toString)) // return both stages > } > > val (stage3, stage4) = run(src) > stage3.addSink(Write_To_Kafka_Topic_Y) > stage4.addSink(Write_To_Kafka_Topic_X) > > > On Wed, 30 Mar 2016 at 20:19 Deepak Jha <[hidden email]> wrote: > > > Hi, > > I'm building a pipeline using Flink using Kafka as source and sink. As > part > > of the this pipeline I have multiple stages in my run command and I would > > like to publish some substages output into separate kafka topic. > > My question is can I write multiple stages of run to multiple kafka > topics > > ? > > > > private val env = StreamExecutionEnvironment.getExecutionEnvironment > > private val src = env.addSource(Source.kafka(streams.abc.topic)) > > > > override def run(stream: DataStream[TypeX]) : = { > > > > val stage1 = stream > > .map(doA) > > .map(doB) > > .map(doC) > > > > val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, > somethingElse: > > TypeT)* > > > > val stage3 = stage2.filter(_.isTrue) > > *stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run > method > > ?* > > val stage4 = stage2.filter(! _.isTrue) > > > > stage4.map(_.toString) > > } > > > > run(src).addSink(Write_To_Kafka_Topic_X) > > > > > > Ideally I will not prefer to call addSink method inside run (as mentioned > > in bold lines above). > > -- > > Thanks, > > Deepak Jha > > > ... [show rest of quote] -- Thanks, Deepak Jha |
Free forum by Nabble | Disable Popup Ads | Edit this page |