Kafka As Sink in Flink Streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka As Sink in Flink Streaming

Deepak Jha
Hi Devs,
I just started using Flink and would like to ass kafka as Sink. I went
through the documentation but so far I've not succeeded in writing to Kafka
from Flink....

I' building application in Scala.... Here is my code snippet

case class *Demo*(city: String, country: String, zipcode: Int)

The map stage returns an instance of Demo type

 val env = StreamExecutionEnvironment.getExecutionEnvironment

  val properties = new Properties()

  properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
  properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
  properties.setProperty("group.id", "test_topic")
val mapToDemo: String => Demo = {//Implementation}

val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
SimpleStringSchema, properties))

stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092",
"test_topic", new SimpleStringSchema()))

Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
--
Thanks,
Deepak Jha
Reply | Threaded
Open this post in threaded view
|

Re: Kafka As Sink in Flink Streaming

Robert Metzger
Hi,

did you get any error message while executing the job? I don't think you
can serialize the "Demo" type with the "SimpleStringSchema".




On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <[hidden email]> wrote:

> Hi Devs,
> I just started using Flink and would like to ass kafka as Sink. I went
> through the documentation but so far I've not succeeded in writing to Kafka
> from Flink....
>
> I' building application in Scala.... Here is my code snippet
>
> case class *Demo*(city: String, country: String, zipcode: Int)
>
> The map stage returns an instance of Demo type
>
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>   val properties = new Properties()
>
>   properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
>   properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
>   properties.setProperty("group.id", "test_topic")
> val mapToDemo: String => Demo = {//Implementation}
>
> val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
> SimpleStringSchema, properties))
>
> stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("127.0.0.1:9092
> ",
> "test_topic", new SimpleStringSchema()))
>
> Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
> --
> Thanks,
> Deepak Jha
>
Reply | Threaded
Open this post in threaded view
|

Re: Kafka As Sink in Flink Streaming

Deepak Jha
Hi Robert,
No it was compile time issue. Actually i tried to write a string as well
but it did not work for me. Just for clarity my flink-connector-kafka
version is 0.10.1....

I was able to fix the issue... SimpleStringSchema should be replaced with
JavaDefaultStringSchema as the later is doing conversion from String to
Byte Array.

Thanks for the help though.

On Fri, Jan 22, 2016 at 1:34 PM, Robert Metzger <[hidden email]> wrote:

> Hi,
>
> did you get any error message while executing the job? I don't think you
> can serialize the "Demo" type with the "SimpleStringSchema".
>
>
>
>
> On Fri, Jan 22, 2016 at 8:13 PM, Deepak Jha <[hidden email]> wrote:
>
> > Hi Devs,
> > I just started using Flink and would like to ass kafka as Sink. I went
> > through the documentation but so far I've not succeeded in writing to
> Kafka
> > from Flink....
> >
> > I' building application in Scala.... Here is my code snippet
> >
> > case class *Demo*(city: String, country: String, zipcode: Int)
> >
> > The map stage returns an instance of Demo type
> >
> >  val env = StreamExecutionEnvironment.getExecutionEnvironment
> >
> >   val properties = new Properties()
> >
> >   properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
> >   properties.setProperty("zookeeper.connect", "127.0.0.1:2181")
> >   properties.setProperty("group.id", "test_topic")
> > val mapToDemo: String => Demo = {//Implementation}
> >
> > val stream = env.addSource(new FlinkKafkaConsumer082[String]("test", new
> > SimpleStringSchema, properties))
> >
> > stream.map(mapToDemo).addSink(new FlinkKafkaProducer[Demo]("
> 127.0.0.1:9092
> > ",
> > "test_topic", new SimpleStringSchema()))
> >
> > Can anyone explain me what am I doing wrong in adding Kafka as Sink ?
> > --
> > Thanks,
> > Deepak Jha
> >
>



--
Thanks,
Deepak Jha