I am new to apache beam and spring cloud dataflow. I am trying to integrate
apache beam in spring cloud dataflow. How to get spring-kafka message as a source in beam pipeline ?. How to add spring-kafka as a sink in beam pipeline ? Wanted to run pipeline forever untilfinish. Please suggest how can I integrate ? example wordcount PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) ----> instead of TextIO.read().from want to trigger from message channel INPUT in spring cloud dataflow .apply(FlatMapElements .into(TypeDescriptors.strings()) .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements .into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); ----> send the result to message channel OUTPUT p.run().waitUntilFinish(); -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi,
you wrote to the Apache Flink development mailing list. I think your question should go to the Apache Beam user mailing list: [hidden email] Best, Fabian 2018-02-22 14:35 GMT+01:00 shankara <[hidden email]>: > I am new to apache beam and spring cloud dataflow. I am trying to integrate > apache beam in spring cloud dataflow. How to get spring-kafka message as a > source in beam pipeline ?. How to add spring-kafka as a sink in beam > pipeline ? Wanted to run pipeline forever untilfinish. Please suggest how > can I integrate ? > > example wordcount PipelineOptions options = PipelineOptionsFactory.create( > ); > > Pipeline p = Pipeline.create(options); > > p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) > ----> > instead of TextIO.read().from want to trigger from message channel INPUT in > spring cloud dataflow > .apply(FlatMapElements > .into(TypeDescriptors.strings()) > .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) > .apply(Filter.by((String word) -> !word.isEmpty())) > .apply(Count.<String>perElement()) > .apply(MapElements > .into(TypeDescriptors.strings()) > .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + > wordCount.getValue())) > .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); > ----> send the result to message channel OUTPUT > > p.run().waitUntilFinish(); > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Free forum by Nabble | Edit this page |