Hi,
I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and insert to ElasticSearch. I have a kafka connector convert the data to a Flink table. In order to insert into Elasticsearch, I have converted this table to a datastream, in order to be able to use the ElasticSearchSink. But the Row returned by the streams, have lost the schema. How do i convert this to JSON before calling the Elasticsearch sink connector. Any help or suggestions would be appreciated. Thanks. |
Hi Ramya,
Have you tried writing to ES directly from table API? You can check the ES connector for table API here: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector Best, Dawid On 10/01/2019 09:21, Ramya Ramamurthy wrote: > Hi, > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and > insert to ElasticSearch. I have a kafka connector convert the data to a > Flink table. In order to insert into Elasticsearch, I have converted this > table to a datastream, in order to be able to use the ElasticSearchSink. > But the Row returned by the streams, have lost the schema. How do i convert > this to JSON before calling the Elasticsearch sink connector. Any help or > suggestions would be appreciated. > > Thanks. > signature.asc (849 bytes) Download Attachment |
Hi David,
thanks for the quick reply. I did try that. I am not sure how to push into rolling indices here. For example, i would maintain daily indices on ES. Based on the event time, i would like to classify the packets to appropriate indices. If there was some lag in the source kafka, and i get to receive yesterday's data [say maybe at 00:05 or something], Not sure how to pack the indices here. Is there a way to come around this ?? Regards, ~Ramya. On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]> wrote: > Hi Ramya, > > Have you tried writing to ES directly from table API? You can check the > ES connector for table API here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector > > Best, > > Dawid > > On 10/01/2019 09:21, Ramya Ramamurthy wrote: > > Hi, > > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and > > insert to ElasticSearch. I have a kafka connector convert the data to a > > Flink table. In order to insert into Elasticsearch, I have converted this > > table to a datastream, in order to be able to use the ElasticSearchSink. > > But the Row returned by the streams, have lost the schema. How do i > convert > > this to JSON before calling the Elasticsearch sink connector. Any help or > > suggestions would be appreciated. > > > > Thanks. > > > > |
You can use flink to manipulate the data by using
TimeCharacteristic.EventTime[1] and set Watermark. Then if you have a lag or other issue the data will be insert to the correct Indexes in elastic. More specific way to implement it with kafka[2] 1. https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps 2. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <[hidden email]> wrote: > Hi David, > > thanks for the quick reply. > I did try that. I am not sure how to push into rolling indices here. > For example, i would maintain daily indices on ES. Based on the event > time, i would like to classify the packets to appropriate indices. If there > was some lag in the source kafka, and i get to receive yesterday's data > [say maybe at 00:05 or something], Not sure how to pack the indices here. > Is there a way to come around this ?? > > Regards, > ~Ramya. > > On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email]> > wrote: > > > Hi Ramya, > > > > Have you tried writing to ES directly from table API? You can check the > > ES connector for table API here: > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector > > > > Best, > > > > Dawid > > > > On 10/01/2019 09:21, Ramya Ramamurthy wrote: > > > Hi, > > > > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka and > > > insert to ElasticSearch. I have a kafka connector convert the data to a > > > Flink table. In order to insert into Elasticsearch, I have converted > this > > > table to a datastream, in order to be able to use the > ElasticSearchSink. > > > But the Row returned by the streams, have lost the schema. How do i > > convert > > > this to JSON before calling the Elasticsearch sink connector. Any help > or > > > suggestions would be appreciated. > > > > > > Thanks. > > > > > > > > |
Hi,
Sorry I am a beginner here. I am not really sure how to pack the dynamic indices here. the .index(test-ddmmyy) kind of indices here. I have set the watermark for my kafka table source, but not sure how this works on the the ElasticSearch Sink. Pasted my sample code below: tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("write-topic")) .property("bootstrap.servers", "localhost:9092") .sinkPartitionerRoundRobin()) .withSchema(new Schema() .field("sid", Types.STRING()) .field("ip", Types.STRING()) .field("family", Types.STRING()) .field("total_hits", Types.LONG()) .field("tumbleStart", Types.SQL_TIMESTAMP()) .field("tumbleEnd", Types.SQL_TIMESTAMP()) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSink("sinkTopic"); new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("test") ---- How to pass dynamic indices here, based on the packet received from the table sink. .documentType("user") .failureHandlerRetryRejected() .failureHandlerIgnore() .bulkFlushMaxSize("20 mb") .bulkFlushInterval(100000L) .bulkFlushBackoffMaxRetries(3) .connectionMaxRetryTimeout(3) .connectionPathPrefix("/v1") Thanks again !! On Thu, Jan 10, 2019 at 2:55 PM miki haiat <[hidden email]> wrote: > You can use flink to manipulate the data by using > TimeCharacteristic.EventTime[1] and set Watermark. > Then if you have a lag or other issue the data will be insert to the > correct Indexes in elastic. > More specific way to implement it with kafka[2] > > > > > 1. > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps > 2. > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > > On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy <[hidden email]> > wrote: > > > Hi David, > > > > thanks for the quick reply. > > I did try that. I am not sure how to push into rolling indices here. > > For example, i would maintain daily indices on ES. Based on the event > > time, i would like to classify the packets to appropriate indices. If > there > > was some lag in the source kafka, and i get to receive yesterday's data > > [say maybe at 00:05 or something], Not sure how to pack the indices here. > > Is there a way to come around this ?? > > > > Regards, > > ~Ramya. > > > > On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email] > > > > wrote: > > > > > Hi Ramya, > > > > > > Have you tried writing to ES directly from table API? You can check the > > > ES connector for table API here: > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connector > > > > > > Best, > > > > > > Dawid > > > > > > On 10/01/2019 09:21, Ramya Ramamurthy wrote: > > > > Hi, > > > > > > > > I am learning to Flink. With Flink 1.7.1, trying to read from Kafka > and > > > > insert to ElasticSearch. I have a kafka connector convert the data > to a > > > > Flink table. In order to insert into Elasticsearch, I have converted > > this > > > > table to a datastream, in order to be able to use the > > ElasticSearchSink. > > > > But the Row returned by the streams, have lost the schema. How do i > > > convert > > > > this to JSON before calling the Elasticsearch sink connector. Any > help > > or > > > > suggestions would be appreciated. > > > > > > > > Thanks. > > > > > > > > > > > > > |
Hi, I'm afraid you cannot write to different indices using the table API ElasticSearch connector. Now I know why you wanted to go through datastream API. What you could do to transform from Row to JSON is to use org.apache.flink.formats.json.JsonRowSerializationSchema from flink-json. You just need to get the schema from the final Table of your table API part. Your code could like this:
TypeInformation<Row> schema =
table.getSchema().toRowType(); SerializationSchema<Row> serializationSchema = new JsonRowSerializationSchema(schema);
byte[] document = serializationSchema.serialize(row) ... return new IndexRequest(index, docType) } Best, Dawid On 10/01/2019 10:34, Ramya Ramamurthy
wrote:
Hi, Sorry I am a beginner here. I am not really sure how to pack the dynamic indices here. the .index(test-ddmmyy) kind of indices here. I have set the watermark for my kafka table source, but not sure how this works on the the ElasticSearch Sink. Pasted my sample code below: tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("write-topic")) .property("bootstrap.servers", "localhost:9092") .sinkPartitionerRoundRobin()) .withSchema(new Schema() .field("sid", Types.STRING()) .field("ip", Types.STRING()) .field("family", Types.STRING()) .field("total_hits", Types.LONG()) .field("tumbleStart", Types.SQL_TIMESTAMP()) .field("tumbleEnd", Types.SQL_TIMESTAMP()) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSink("sinkTopic"); new Elasticsearch() .version("6") .host("localhost", 9200, "http") .index("test") ---- How to pass dynamic indices here, based on the packet received from the table sink. .documentType("user") .failureHandlerRetryRejected() .failureHandlerIgnore() .bulkFlushMaxSize("20 mb") .bulkFlushInterval(100000L) .bulkFlushBackoffMaxRetries(3) .connectionMaxRetryTimeout(3) .connectionPathPrefix("/v1") Thanks again !! On Thu, Jan 10, 2019 at 2:55 PM miki haiat [hidden email] wrote:You can use flink to manipulate the data by using TimeCharacteristic.EventTime[1] and set Watermark. Then if you have a lag or other issue the data will be insert to the correct Indexes in elastic. More specific way to implement it with kafka[2] 1. https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#assigning-timestamps 2. https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission On Thu, Jan 10, 2019 at 11:10 AM Ramya Ramamurthy [hidden email] wrote:Hi David, thanks for the quick reply. I did try that. I am not sure how to push into rolling indices here. For example, i would maintain daily indices on ES. Based on the event time, i would like to classify the packets to appropriate indices. Iftherewas some lag in the source kafka, and i get to receive yesterday's data [say maybe at 00:05 or something], Not sure how to pack the indices here. Is there a way to come around this ?? Regards, ~Ramya. On Thu, Jan 10, 2019 at 2:04 PM Dawid Wysakowicz <[hidden email] wrote:Hi Ramya, Have you tried writing to ES directly from table API? You can check the ES connector for table API here:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#elasticsearch-connectorBest, Dawid On 10/01/2019 09:21, Ramya Ramamurthy wrote:Hi, I am learning to Flink. With Flink 1.7.1, trying to read from Kafkaandinsert to ElasticSearch. I have a kafka connector convert the datato aFlink table. In order to insert into Elasticsearch, I have convertedthistable to a datastream, in order to be able to use theElasticSearchSink.But the Row returned by the streams, have lost the schema. How do iconvertthis to JSON before calling the Elasticsearch sink connector. Anyhelporsuggestions would be appreciated. Thanks. signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |