This post was updated on .
Hi all!
We have started some preliminary work on the flink elasticsearch integration (es connector for es version7) at hikvision research institute. It seems that the integration should think throughly. And we want to contribute our code for the conmunity. So I think I should open a discussion thread with my initial ideas to get some early feedback. *Minimal background* Some companies truly have the scenario of reading elasticsearch. You can see our initial discussion here. https://issues.apache.org/jira/browse/FLINK-16713 *Design docs* Here is our design docs for adapt diffirent es version such as 5,6,7 Becase we can not access google docs, so we just use yuque project. https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 We are very looking forward to your response. Cheers, Jacky Lau -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi Jacky,
thanks a lot for starting the discussion. I have no objections to adding support for reading data from ElasticSearch as well, as long as we clearly state the performance and correctness implications / guarantees in the Flink documentation. On Tue, Jun 2, 2020 at 9:52 AM Jacky Lau <[hidden email]> wrote: > Hi all! > > We have started some preliminary work on the flink elasticsearch > integration > (es connector for es version7) at hikvision research institute. > It seems that the integration should think throughly. And we want to > contribute our code for the conmunity. > So I think I should open a discussion thread with my initial ideas to get > some early feedback. > > *Minimal background* > Some companies truly have the scenario of reading elasticsearch. You can > see > our initial discussion here. > https://issues.apache.org/jira/browse/FLINK-16713 > > *Design docs* > Here is our design docs for adapt diffirent es version such as 5,6,7 > Becase we can access google docs, so we just use yuque project. > > https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 > > We are very looking forwary your response. > > Cheers, > Jacky Lau > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
Hi Robert Metzger:
Thanks for your response. could you please read this docs. https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 . Any Is it any problem here? we are worried about we do not think throughly. thanks. -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi, Jackey.
Thanks for driving this discussion. I think this proposal should be a FLIP[1] since it impacts the public interface. However, as we have only some preliminary discussions atm, a design draft would be ok. But it would be better to organize your document according to [2]. I've two basic questions: - Could your summarize all the public API and configurations (DDL) of the ElasticSearchTableSource? - If we want to implement ElasticSearch DataStream Source at the same time, do we need to do a lot of extra work apart from this? It also would be good if you could do some tests (performance and correctness) to address Robert's comments. [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template Best, Yangze Guo On Wed, Jun 3, 2020 at 9:41 AM Jacky Lau <[hidden email]> wrote: > > Hi Robert Metzger: > Thanks for your response. could you please read this docs. > https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 > . Any Is it any problem here? we are worried about > we do not think throughly. thanks. > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Hi,
I made the Elasticsearch connector of Apache Beam and I was thinking about doing the same for Flink when I came by this discussion. I have some comments regarding the design doc: 1. Streaming source: ES has data streams features but only for time series data; the aim of this source is to read all kind of data. Apart from data streams, ES behaves like a database: you read the content of an index (similar to a table) corresponding to the given query (similar to SQL). So, regarding streaming changes, if there are changes between 2 read requests made by the source, at the second the whole index (containing the change) will be read another time. So, I see no way of having a regular flow of documents updates (insertion, deletion, update) as we would need for a streaming source. Regarding failover: I guess exactly once semantics cannot be guaranteed, only at least once semantics can. Indeed there is no ack mechanism on already read data. As a conclusion, IMO you are right to target only batch source. Also this answers Yangze Guo's question about streaming source. Question is: can a batch only source be accepted as a built in flink source ? 2. hadoop ecosystem Why not use RichParallelSourceFunction ? 3. Splitting Splitting with one split = one ES shard could lead to sub-parallelism. IMHO I think that what's important is the number of executors there are in the Flink cluster: it is better to use runtimeContext.getIndexOfThisSubtask() and runtimeContext.getMaxNumberOfParallelSubtasks() to split the input data using ES slice API. 4. Targeting ES 5, 6, 7 In Beam I used low level REST client because it is compatible with all ES versions so it allows to have the same code base for all versions. But this client is very low level (String based requests). Now, high level rest client exists (it was not available at the time), it is the one I would use. It is also available for ES 5 so you should use it for ES 5 instead of deprecated Transport client. Best Etienne Chauchot. On 04/06/2020 08:47, Yangze Guo wrote: > Hi, Jackey. > > Thanks for driving this discussion. I think this proposal should be a > FLIP[1] since it impacts the public interface. However, as we have > only some preliminary discussions atm, a design draft would be ok. But > it would be better to organize your document according to [2]. > > I've two basic questions: > - Could your summarize all the public API and configurations (DDL) of > the ElasticSearchTableSource? > - If we want to implement ElasticSearch DataStream Source at the same > time, do we need to do a lot of extra work apart from this? > > It also would be good if you could do some tests (performance and > correctness) to address Robert's comments. > > [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > > Best, > Yangze Guo > > On Wed, Jun 3, 2020 at 9:41 AM Jacky Lau <[hidden email]> wrote: >> Hi Robert Metzger: >> Thanks for your response. could you please read this docs. >> https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 >> . Any Is it any problem here? we are worried about >> we do not think throughly. thanks. >> >> >> >> -- >> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Thanks Jacky for starting this discussion.
The requirement of ES source has been proposed in the community many times. +1 for the feature from my side. Here are my thoughts: 1. streaming source As we only support bounded source for JDBC and HBase, so I think it's fine to have a bounded ES source. 2. lookup source Have you ever thought about having ES as a lookup source just like an HBase table and lookup by index? I'm not sure whether it works. But my gut feeling tells me it is an interesting feature and there may be a need for it. 3. DDL options I agree with Yangze, it is important to list what new options you want to add. It would be nice to organize your design doc according to FLIP template (to have "Public Interface" and "Proposed Changes"). 4. Implement in new table source interface (FLIP-95) Since 1.11, we proposed a new set of table connector interfaces (FLIP-95) with more powerful features. Old table source interface will be removed in the future. 5. DataStream source It would be nicer to expose a DataStream source too, and share implementations as much as possible. Best, Jark On Thu, 4 Jun 2020 at 22:07, Etienne Chauchot <[hidden email]> wrote: > Hi, > > I made the Elasticsearch connector of Apache Beam and I was thinking > about doing the same for Flink when I came by this discussion. I have > some comments regarding the design doc: > > 1. Streaming source: > > ES has data streams features but only for time series data; the aim of > this source is to read all kind of data. Apart from data streams, ES > behaves like a database: you read the content of an index (similar to a > table) corresponding to the given query (similar to SQL). So, regarding > streaming changes, if there are changes between 2 read requests made by > the source, at the second the whole index (containing the change) will > be read another time. So, I see no way of having a regular flow of > documents updates (insertion, deletion, update) as we would need for a > streaming source. Regarding failover: I guess exactly once semantics > cannot be guaranteed, only at least once semantics can. Indeed there is > no ack mechanism on already read data. As a conclusion, IMO you are > right to target only batch source. Also this answers Yangze Guo's > question about streaming source. Question is: can a batch only source be > accepted as a built in flink source ? > > 2. hadoop ecosystem > > Why not use RichParallelSourceFunction ? > > 3. Splitting > > Splitting with one split = one ES shard could lead to sub-parallelism. > IMHO I think that what's important is the number of executors there are > in the Flink cluster: it is better to use > runtimeContext.getIndexOfThisSubtask() and > runtimeContext.getMaxNumberOfParallelSubtasks() to split the input data > using ES slice API. > > 4. Targeting ES 5, 6, 7 > > In Beam I used low level REST client because it is compatible with all > ES versions so it allows to have the same code base for all versions. > But this client is very low level (String based requests). Now, high > level rest client exists (it was not available at the time), it is the > one I would use. It is also available for ES 5 so you should use it for > ES 5 instead of deprecated Transport client. > > Best > > Etienne Chauchot. > > > On 04/06/2020 08:47, Yangze Guo wrote: > > Hi, Jackey. > > > > Thanks for driving this discussion. I think this proposal should be a > > FLIP[1] since it impacts the public interface. However, as we have > > only some preliminary discussions atm, a design draft would be ok. But > > it would be better to organize your document according to [2]. > > > > I've two basic questions: > > - Could your summarize all the public API and configurations (DDL) of > > the ElasticSearchTableSource? > > - If we want to implement ElasticSearch DataStream Source at the same > > time, do we need to do a lot of extra work apart from this? > > > > It also would be good if you could do some tests (performance and > > correctness) to address Robert's comments. > > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template > > > > Best, > > Yangze Guo > > > > On Wed, Jun 3, 2020 at 9:41 AM Jacky Lau <[hidden email]> wrote: > >> Hi Robert Metzger: > >> Thanks for your response. could you please read this docs. > >> > https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 > >> . Any Is it any problem here? we are worried about > >> we do not think throughly. thanks. > >> > >> > >> > >> -- > >> Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
hi Etienne Chauchot:
thanks for your discussion. for 1) we do not supprt es unbouded source currently for 2) RichParallelSourceFunction is used for streaming ,InputFormat is for batch for 3) i downloaded beam just now. and the beam es connector is also using es-hadoop. i have read the code of es-hadoop(inputsplit contains shard and slice. And i think it is better when diffirent shard has diffirent number of docs), which you can seed here .https://github.com/elastic/elasticsearch-hadoop. But the code is not good. so we do not want to reference . and you can see presto, there is also just using inputsplit with shard not contains slice for 4) because flink es connectro has alreay using diffrent client (es 5 for tranport client, es 6,7 for highlevelrest), we just reuse it,which will not change too much code -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
This post was updated on .
In reply to this post by Jark Wu-2
Thanks for JackWu and Yangze.
1) we will reorganize our design and move it to the flip 2) we will support lookup and DynamicSource. and current DynamicSource's optimize, such as supportLimitPushDown/supportLimitPushdown doesn't have achieved. so we will do it after it have done Thanks for your response. Jark Wu-2 wrote > Thanks Jacky for starting this discussion. > > The requirement of ES source has been proposed in the community many > times. +1 for the feature from my side. > > Here are my thoughts: > > 1. streaming source > As we only support bounded source for JDBC and HBase, so I think it's fine > to have a bounded ES source. > > 2. lookup source > Have you ever thought about having ES as a lookup source just like an > HBase > table and lookup by index? > I'm not sure whether it works. But my gut feeling tells me it is an > interesting feature and there may be a need for it. > > 3. DDL options > I agree with Yangze, it is important to list what new options you want to > add. It would be nice to organize your design > doc according to FLIP template (to have "Public Interface" and "Proposed > Changes"). > > 4. Implement in new table source interface (FLIP-95) > Since 1.11, we proposed a new set of table connector interfaces (FLIP-95) > with more powerful features. > Old table source interface will be removed in the future. > > 5. DataStream source > It would be nicer to expose a DataStream source too, and share > implementations as much as possible. > > > Best, > Jark > > > On Thu, 4 Jun 2020 at 22:07, Etienne Chauchot < > echauchot@ > > wrote: > >> Hi, >> >> I made the Elasticsearch connector of Apache Beam and I was thinking >> about doing the same for Flink when I came by this discussion. I have >> some comments regarding the design doc: >> >> 1. Streaming source: >> >> ES has data streams features but only for time series data; the aim of >> this source is to read all kind of data. Apart from data streams, ES >> behaves like a database: you read the content of an index (similar to a >> table) corresponding to the given query (similar to SQL). So, regarding >> streaming changes, if there are changes between 2 read requests made by >> the source, at the second the whole index (containing the change) will >> be read another time. So, I see no way of having a regular flow of >> documents updates (insertion, deletion, update) as we would need for a >> streaming source. Regarding failover: I guess exactly once semantics >> cannot be guaranteed, only at least once semantics can. Indeed there is >> no ack mechanism on already read data. As a conclusion, IMO you are >> right to target only batch source. Also this answers Yangze Guo's >> question about streaming source. Question is: can a batch only source be >> accepted as a built in flink source ? >> >> 2. hadoop ecosystem >> >> Why not use RichParallelSourceFunction ? >> >> 3. Splitting >> >> Splitting with one split = one ES shard could lead to sub-parallelism. >> IMHO I think that what's important is the number of executors there are >> in the Flink cluster: it is better to use >> runtimeContext.getIndexOfThisSubtask() and >> runtimeContext.getMaxNumberOfParallelSubtasks() to split the input data >> using ES slice API. >> >> 4. Targeting ES 5, 6, 7 >> >> In Beam I used low level REST client because it is compatible with all >> ES versions so it allows to have the same code base for all versions. >> But this client is very low level (String based requests). Now, high >> level rest client exists (it was not available at the time), it is the >> one I would use. It is also available for ES 5 so you should use it for >> ES 5 instead of deprecated Transport client. >> >> Best >> >> Etienne Chauchot. >> >> >> On 04/06/2020 08:47, Yangze Guo wrote: >> > Hi, Jackey. >> > >> > Thanks for driving this discussion. I think this proposal should be a >> > FLIP[1] since it impacts the public interface. However, as we have >> > only some preliminary discussions atm, a design draft would be ok. But >> > it would be better to organize your document according to [2]. >> > >> > I've two basic questions: >> > - Could your summarize all the public API and configurations (DDL) of >> > the ElasticSearchTableSource? >> > - If we want to implement ElasticSearch DataStream Source at the same >> > time, do we need to do a lot of extra work apart from this? >> > >> > It also would be good if you could do some tests (performance and >> > correctness) to address Robert's comments. >> > >> > [1] >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >> > [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template >> > >> > Best, >> > Yangze Guo >> > >> > On Wed, Jun 3, 2020 at 9:41 AM Jacky Lau < > liuyongvs@ > > wrote: >> >> Hi Robert Metzger: >> >> Thanks for your response. could you please read this docs. >> >> >> https://www.yuque.com/jackylau-sc7w6/bve18l/14a2ad5b7f86998433de83dd0f8ec067 >> >> . Any Is it any problem here? we are worried about >> >> we do not think throughly. thanks. >> >> >> >> >> >> >> >> -- >> >> Sent from: >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ >> -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
In reply to this post by Jacky Lau
hi Etienne Chauchot:
you can read here https://www.jianshu.com/p/d32e17dab90c, which is chinese.But you can konw that slice api has poor performance in es-hadoop project . And i found that es-hadoop has removed this and disable sliced scrolls by default. you can see below, which i found in the lastest es-hadoop release version ==== Configuration Changes `es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be removed in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000) will also be removed in 7.0.0, thus disabling sliced scrolls by default, and switching them to be an explicitly opt-in feature. added[5.0.0] `es.input.max.docs.per.partition` :: When reading from an {es} cluster that supports scroll slicing ({es} v5.0.0 and above), this parameter advises the connector on what the maximum number of documents per input partition should be. The connector will sample and estimate the number of documents on each shard to be read and divides each shard into input slices using the value supplied by this property. This property is a suggestion, not a guarantee. The final number of documents per partition is not guaranteed to be below this number, but rather, they will be close to this number. This property is ignored if you are reading from an {es} cluster that does not support scroll slicing ({es} any version below v5.0.0). By default, this value is unset, and the input partitions are calculated based on the number of shards in the indices being read. Jacky Lau wrote > hi Etienne Chauchot: > thanks for your discussion. > for 1) we do not supprt es unbouded source currently > > for 2) RichParallelSourceFunction is used for streaming ,InputFormat is > for > batch > > for 3) i downloaded beam just now. and the beam es connector is also > using > es-hadoop. i have read the code of es-hadoop(inputsplit contains shard and > slice. And i think it is better when diffirent shard has diffirent number > of > docs), which you can seed here > .https://github.com/elastic/elasticsearch-hadoop. But the code is not > good. > so we do not want to reference . and you can see presto, there is also > just > using inputsplit with shard not contains slice > > for 4) because flink es connectro has alreay using diffrent client (es 5 > for > tranport client, es 6,7 for highlevelrest), we just reuse it,which will > not > change too much code > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
hi Jacky Lau :
I agree with jark's point of view, the use of es is not just to read data, more use is to group query, aggregate these. Best, Forward Jacky Lau <[hidden email]> 于2020年6月5日周五 下午2:47写道: > hi Etienne Chauchot: > you can read here https://www.jianshu.com/p/d32e17dab90c, which is > chinese.But you can konw that slice api has poor performance in es-hadoop > project . > > And i found that es-hadoop has removed this and disable sliced scrolls by > default. you can see below, which i found in the lastest es-hadoop release > version > ==== Configuration Changes > `es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be > removed > in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000) > will also be removed in 7.0.0, thus disabling sliced scrolls by default, > and > switching them to be an explicitly opt-in feature. > > added[5.0.0] > `es.input.max.docs.per.partition` :: > When reading from an {es} cluster that supports scroll slicing ({es} v5.0.0 > and above), this parameter advises the > connector on what the maximum number of documents per input partition > should > be. The connector will sample and estimate > the number of documents on each shard to be read and divides each shard > into > input slices using the value supplied by > this property. This property is a suggestion, not a guarantee. The final > number of documents per partition is not > guaranteed to be below this number, but rather, they will be close to this > number. This property is ignored if you are > reading from an {es} cluster that does not support scroll slicing ({es} any > version below v5.0.0). By default, this > value is unset, and the input partitions are calculated based on the number > of shards in the indices being read. > > > > Jacky Lau wrote > > hi Etienne Chauchot: > > thanks for your discussion. > > for 1) we do not supprt es unbouded source currently > > > > for 2) RichParallelSourceFunction is used for streaming ,InputFormat is > > for > > batch > > > > for 3) i downloaded beam just now. and the beam es connector is also > > using > > es-hadoop. i have read the code of es-hadoop(inputsplit contains shard > and > > slice. And i think it is better when diffirent shard has diffirent number > > of > > docs), which you can seed here > > .https://github.com/elastic/elasticsearch-hadoop. But the code is not > > good. > > so we do not want to reference . and you can see presto, there is also > > just > > using inputsplit with shard not contains slice > > > > for 4) because flink es connectro has alreay using diffrent client (es 5 > > for > > tranport client, es 6,7 for highlevelrest), we just reuse it,which will > > not > > change too much code > > > > > > > > -- > > Sent from: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > > > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ > |
In reply to this post by Jacky Lau
Hi Jacky Lau,
1) yes I saw that 2) I saw sources like IntegerSource which are bounded and which extend RichParallelSourceFunction. This is why I mentioned it. 3) True, there is an hadoop ES connector in Beam but it is more of a side connector, the main one is ElasticsearchIO here: https://github.com/apache/beam/blob/095589c28f5c427bf99fc0330af91c859bb2ad6b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L156 and it does not use hadoop. 4) Yes but using the same client could simplify the code in the end, but I agree it needs more change in the current code. Etienne On 05/06/2020 05:50, Jacky Lau wrote: > hi Etienne Chauchot: > thanks for your discussion. > for 1) we do not supprt es unbouded source currently > > for 2) RichParallelSourceFunction is used for streaming ,InputFormat is for > batch > > for 3) i downloaded beam just now. and the beam es connector is also using > es-hadoop. i have read the code of es-hadoop(inputsplit contains shard and > slice. And i think it is better when diffirent shard has diffirent number of > docs), which you can seed here > .https://github.com/elastic/elasticsearch-hadoop. But the code is not good. > so we do not want to reference . and you can see presto, there is also just > using inputsplit with shard not contains slice > > for 4) because flink es connectro has alreay using diffrent client (es 5 for > tranport client, es 6,7 for highlevelrest), we just reuse it,which will not > change too much code > > > > -- > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ |
Free forum by Nabble | Edit this page |