Hi Devs,
I just started using Flink and would like to ass kafka as Sink. I went through the documentation but so far I've not succeeded in writing to Kafka from Flink.... I' building application in Scala.... Here is my code snippet case class *Demo*(city: String, country: String, zipcode: Int) The map stage returns an instance of Demo type val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "127.0.0.1:9092") properties.setProperty("zookeeper.connect", "127.0.0.1:2181") properties.setProperty("group.id", "test_topic") val mapToDemo: String => Demo = {//Implementation} val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new SimpleStringSchema, properties)) stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092", "test_topic", new SimpleStringSchema())) Can anyone explain me what am I doing wrong in adding Kafka as Sink ? -- Thanks, Deepak Jha |
Hi,
did you get any error message while executing the job? I don't think you can serialize the "Demo" type with the "SimpleStringSchema". On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <[hidden email]> wrote: > Hi Devs, > I just started using Flink and would like to ass kafka as Sink. I went > through the documentation but so far I've not succeeded in writing to Kafka > from Flink.... > > I' building application in Scala.... Here is my code snippet > > case class *Demo*(city: String, country: String, zipcode: Int) > > The map stage returns an instance of Demo type > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > val properties = new Properties() > > properties.setProperty("bootstrap.servers", "127.0.0.1:9092") > properties.setProperty("zookeeper.connect", "127.0.0.1:2181") > properties.setProperty("group.id", "test_topic") > val mapToDemo: String => Demo = {//Implementation} > > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new > SimpleStringSchema, properties)) > > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092 > ", > "test_topic", new SimpleStringSchema())) > > Can anyone explain me what am I doing wrong in adding Kafka as Sink ? > -- > Thanks, > Deepak Jha > |
Hi Robert,
No it was compile time issue. Actually i tried to write a string as well but it did not work for me. Just for clarity my flink-connector-kafka version is 0.10.1.... I was able to fix the issue... SimpleStringSchema should be replaced with JavaDefaultStringSchema as the later is doing conversion from String to Byte Array. Thanks for the help though. On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote: > Hi, > > did you get any error message while executing the job? I don't think you > can serialize the "Demo" type with the "SimpleStringSchema". > > > > > On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <[hidden email]> wrote: > > > Hi Devs, > > I just started using Flink and would like to ass kafka as Sink. I went > > through the documentation but so far I've not succeeded in writing to > Kafka > > from Flink.... > > > > I' building application in Scala.... Here is my code snippet > > > > case class *Demo*(city: String, country: String, zipcode: Int) > > > > The map stage returns an instance of Demo type > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > val properties = new Properties() > > > > properties.setProperty("bootstrap.servers", "127.0.0.1:9092") > > properties.setProperty("zookeeper.connect", "127.0.0.1:2181") > > properties.setProperty("group.id", "test_topic") > > val mapToDemo: String => Demo = {//Implementation} > > > > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new > > SimpleStringSchema, properties)) > > > > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo](" > 127.0.0.1:9092 > > ", > > "test_topic", new SimpleStringSchema())) > > > > Can anyone explain me what am I doing wrong in adding Kafka as Sink ? > > -- > > Thanks, > > Deepak Jha > > > -- Thanks, Deepak Jha |
Free forum by Nabble | Edit this page |