ElasticSearch Connector

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticSearch Connector

Ramya Ramamurthy
Hi,

I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and
insert to ElasticSearch. I have a kafka connector convert the data to a
Flink table. In order to insert into Elasticsearch, I have converted this
table to a datastream, in order to be able to use the ElasticSearchSink.
But the Row returned by the streams, have lost the schema. How do i convert
this to JSON before calling the Elasticsearch sink connector. Any help or
suggestions would be appreciated.

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch Connector

dwysakowicz
Hi Ramya,

Have you tried writing to ES directly from table API? You can check the
ES connector for table API here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector

Best,

Dawid

On 10/01/2019 09:21, Ramya Ramamurthy wrote:

> Hi,
>
> I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and
> insert to ElasticSearch. I have a kafka connector convert the data to a
> Flink table. In order to insert into Elasticsearch, I have converted this
> table to a datastream, in order to be able to use the ElasticSearchSink.
> But the Row returned by the streams, have lost the schema. How do i convert
> this to JSON before calling the Elasticsearch sink connector. Any help or
> suggestions would be appreciated.
>
> Thanks.
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch Connector

Ramya Ramamurthy
Hi David,

thanks for the quick reply.
I did try that. I am not sure how to push into rolling indices here.
For example,  i would maintain daily indices on ES. Based on the event
time, i would like to classify the packets to appropriate indices. If there
was some lag in the source kafka, and i get to receive yesterday's data
[say maybe at 00:05 or something], Not sure how to pack the indices here.
Is there a way to come around this ??

Regards,
~Ramya.

On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Ramya,
>
> Have you tried writing to ES directly from table API? You can check the
> ES connector for table API here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
>
> Best,
>
> Dawid
>
> On 10/01/2019 09:21, Ramya Ramamurthy wrote:
> > Hi,
> >
> > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and
> > insert to ElasticSearch. I have a kafka connector convert the data to a
> > Flink table. In order to insert into Elasticsearch, I have converted this
> > table to a datastream, in order to be able to use the ElasticSearchSink.
> > But the Row returned by the streams, have lost the schema. How do i
> convert
> > this to JSON before calling the Elasticsearch sink connector. Any help or
> > suggestions would be appreciated.
> >
> > Thanks.
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch Connector

miki haiat
You can use flink  to manipulate the data by using
TimeCharacteristic.EventTime[1] and set Watermark.
Then if you have a lag or other issue  the data will be insert to the
correct Indexes in elastic.
More specific way to implement it with kafka[2]




1.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission


On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <[hidden email]> wrote:

> Hi David,
>
> thanks for the quick reply.
> I did try that. I am not sure how to push into rolling indices here.
> For example,  i would maintain daily indices on ES. Based on the event
> time, i would like to classify the packets to appropriate indices. If there
> was some lag in the source kafka, and i get to receive yesterday's data
> [say maybe at 00:05 or something], Not sure how to pack the indices here.
> Is there a way to come around this ??
>
> Regards,
> ~Ramya.
>
> On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]>
> wrote:
>
> > Hi Ramya,
> >
> > Have you tried writing to ES directly from table API? You can check the
> > ES connector for table API here:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
> >
> > Best,
> >
> > Dawid
> >
> > On 10/01/2019 09:21, Ramya Ramamurthy wrote:
> > > Hi,
> > >
> > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and
> > > insert to ElasticSearch. I have a kafka connector convert the data to a
> > > Flink table. In order to insert into Elasticsearch, I have converted
> this
> > > table to a datastream, in order to be able to use the
> ElasticSearchSink.
> > > But the Row returned by the streams, have lost the schema. How do i
> > convert
> > > this to JSON before calling the Elasticsearch sink connector. Any help
> or
> > > suggestions would be appreciated.
> > >
> > > Thanks.
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch Connector

Ramya Ramamurthy
Hi,

Sorry I am a beginner here. I am not really sure how to pack the dynamic
indices here.
the .index(test-ddmmyy) kind of indices here.
I have set the watermark for my kafka table source, but not sure how this
works on the the ElasticSearch Sink.

Pasted my sample code below:

tableEnv.connect(new Kafka()
      .version("0.11")
      .topic(params.getRequired("write-topic"))
      .property("bootstrap.servers", "localhost:9092")
      .sinkPartitionerRoundRobin())
      .withSchema(new Schema()
            .field("sid", Types.STRING())
            .field("ip", Types.STRING())
            .field("family", Types.STRING())
            .field("total_hits", Types.LONG())
            .field("tumbleStart", Types.SQL_TIMESTAMP())
            .field("tumbleEnd", Types.SQL_TIMESTAMP())
      )
      .withFormat(new Json().deriveSchema())
      .inAppendMode()
      .registerTableSink("sinkTopic");


new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("test")  ---- How to pass dynamic indices here, based on the
packet received from the table sink.
.documentType("user")
.failureHandlerRetryRejected()
.failureHandlerIgnore()

.bulkFlushMaxSize("20 mb")
.bulkFlushInterval(100000L)
.bulkFlushBackoffMaxRetries(3)

.connectionMaxRetryTimeout(3)
.connectionPathPrefix("/v1")


Thanks again !!


On Thu, Jan 10, 2019 at 2:55 PM miki haiat <[hidden email]> wrote:

> You can use flink  to manipulate the data by using
> TimeCharacteristic.EventTime[1] and set Watermark.
> Then if you have a lag or other issue  the data will be insert to the
> correct Indexes in elastic.
> More specific way to implement it with kafka[2]
>
>
>
>
> 1.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
> 2.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
>
> On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <[hidden email]>
> wrote:
>
> > Hi David,
> >
> > thanks for the quick reply.
> > I did try that. I am not sure how to push into rolling indices here.
> > For example,  i would maintain daily indices on ES. Based on the event
> > time, i would like to classify the packets to appropriate indices. If
> there
> > was some lag in the source kafka, and i get to receive yesterday's data
> > [say maybe at 00:05 or something], Not sure how to pack the indices here.
> > Is there a way to come around this ??
> >
> > Regards,
> > ~Ramya.
> >
> > On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]
> >
> > wrote:
> >
> > > Hi Ramya,
> > >
> > > Have you tried writing to ES directly from table API? You can check the
> > > ES connector for table API here:
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 10/01/2019 09:21, Ramya Ramamurthy wrote:
> > > > Hi,
> > > >
> > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
> and
> > > > insert to ElasticSearch. I have a kafka connector convert the data
> to a
> > > > Flink table. In order to insert into Elasticsearch, I have converted
> > this
> > > > table to a datastream, in order to be able to use the
> > ElasticSearchSink.
> > > > But the Row returned by the streams, have lost the schema. How do i
> > > convert
> > > > this to JSON before calling the Elasticsearch sink connector. Any
> help
> > or
> > > > suggestions would be appreciated.
> > > >
> > > > Thanks.
> > > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticSearch Connector

dwysakowicz

Hi,

I'm afraid you cannot write to different indices using the table API ElasticSearch connector. Now I know why you wanted to go through datastream API.

What you could do to transform from Row to JSON is to use org.apache.flink.formats.json.JsonRowSerializationSchema from flink-json. You just need to get the schema from the final Table of your table API part. Your code could like this:


TypeInformation<Row> schema = table.getSchema().toRowType();

SerializationSchema<Row> serializationSchema = new JsonRowSerializationSchema(schema);

 public IndexRequest createIndexRequest(Row element) {

    byte[] document = serializationSchema.serialize(row)

    ...

    return new IndexRequest(index, docType)
                .source(document, contentType)

 }

Best,

Dawid

On 10/01/2019 10:34, Ramya Ramamurthy wrote:
Hi,

Sorry I am a beginner here. I am not really sure how to pack the dynamic
indices here.
the .index(test-ddmmyy) kind of indices here.
I have set the watermark for my kafka table source, but not sure how this
works on the the ElasticSearch Sink.

Pasted my sample code below:

tableEnv.connect(new Kafka()
      .version("0.11")
      .topic(params.getRequired("write-topic"))
      .property("bootstrap.servers", "localhost:9092")
      .sinkPartitionerRoundRobin())
      .withSchema(new Schema()
            .field("sid", Types.STRING())
            .field("ip", Types.STRING())
            .field("family", Types.STRING())
            .field("total_hits", Types.LONG())
            .field("tumbleStart", Types.SQL_TIMESTAMP())
            .field("tumbleEnd", Types.SQL_TIMESTAMP())
      )
      .withFormat(new Json().deriveSchema())
      .inAppendMode()
      .registerTableSink("sinkTopic");


new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("test")  ---- How to pass dynamic indices here, based on the
packet received from the table sink.
.documentType("user")
.failureHandlerRetryRejected()
.failureHandlerIgnore()

.bulkFlushMaxSize("20 mb")
.bulkFlushInterval(100000L)
.bulkFlushBackoffMaxRetries(3)

.connectionMaxRetryTimeout(3)
.connectionPathPrefix("/v1")


Thanks again !!


On Thu, Jan 10, 2019 at 2:55 PM miki haiat [hidden email] wrote:

You can use flink  to manipulate the data by using
TimeCharacteristic.EventTime[1] and set Watermark.
Then if you have a lag or other issue  the data will be insert to the
correct Indexes in elastic.
More specific way to implement it with kafka[2]




1.

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps
2.

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission


On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy [hidden email]
wrote:

Hi David,

thanks for the quick reply.
I did try that. I am not sure how to push into rolling indices here.
For example,  i would maintain daily indices on ES. Based on the event
time, i would like to classify the packets to appropriate indices. If
there
was some lag in the source kafka, and i get to receive yesterday's data
[say maybe at 00:05 or something], Not sure how to pack the indices here.
Is there a way to come around this ??

Regards,
~Ramya.

On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]

wrote:

Hi Ramya,

Have you tried writing to ES directly from table API? You can check the
ES connector for table API here:



        
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector
Best,

Dawid

On 10/01/2019 09:21, Ramya Ramamurthy wrote:
Hi,

I am learning to Flink. With Flink 1.7.1, trying to read from Kafka
and
insert to ElasticSearch. I have a kafka connector convert the data
to a
Flink table. In order to insert into Elasticsearch, I have converted
this
table to a datastream, in order to be able to use the
ElasticSearchSink.
But the Row returned by the streams, have lost the schema. How do i
convert
this to JSON before calling the Elasticsearch sink connector. Any
help
or
suggestions would be appreciated.

Thanks.



        

      

    

signature.asc (849 bytes) Download Attachment