Login  Register

Writing multiple streams to multiple kafka

Posted by Deepak Jha on Mar 30, 2016; 6:19pm
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/Writing-multiple-streams-to-multiple-kafka-tp10967.html

Hi,
I'm building a pipeline using Flink using Kafka as source and sink. As part
of the this pipeline I have multiple stages in my run command and I would
like to publish some substages output into separate kafka topic.
My question is can I write multiple stages of run to multiple kafka topics ?

private val env = StreamExecutionEnvironment.getExecutionEnvironment
private val src = env.addSource(Source.kafka(streams.abc.topic))

override def run(stream: DataStream[TypeX]) : = {

  val stage1 = stream
                       .map(doA)
                       .map(doB)
                       .map(doC)

 val stage2 = stage1.map(doD)  *// Returns (isTrue: Boolean, somethingElse:
TypeT)*

val stage3 = stage2.filter(_.isTrue)
*stage3.addSink(Write_To_Kafka_Topic_Y)  // Can I do it outside run method
?*
val stage4 = stage2.filter(! _.isTrue)

stage4.map(_.toString)
}

run(src).addSink(Write_To_Kafka_Topic_X)


Ideally I will not prefer to call addSink method inside run (as mentioned
in bold lines above).
--
Thanks,
Deepak Jha