Writing multiple streams to multiple kafka
Posted by Deepak Jha on Mar 30, 2016; 6:19pm
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/Writing-multiple-streams-to-multiple-kafka-tp10967.html
Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?
private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))
override def run(stream: DataStream[TypeX]) : = {
val stage1 = stream
.map(doA)
.map(doB)
.map(doC)
val stage2 = stage1.map(doD) *// Returns (isTrue: Boolean, somethingElse:
TypeT)*
val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y) // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)
stage4.map(_.toString)
}
run(src).addSink(Write_To_Kafka_Topic_X)
Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
--
Thanks,
Deepak Jha