Hi all,
I have a workload where I need to read and transform large amounts of data from Elasticsearch. I'm currently using Flink only for streaming but I though that it can also be a good fit for this kind of batch job. However, I did not find a way to load data from Elasticsearch in parallel to Flink. I'd like to propose *ElasticsearchInputFormat* which will be able to load data from Elasticsearch in parallel by leveraging the InputSplit mechanism in Flink and the Elasticsearch scroll API. The API should look something like this: ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) .setParametersProvider(paramsProvider) .setIndex("index-name") .setClusterName("type-name") .build(); DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); The '*query' *is a regular ES query specifying the data to fetch. The '*esMapper*' maps JSON data returned from Elasticsearch to some object (In the example above *MessageObj*) In order for it to work in parallel the InputFormat will work with an InputSplit which will get parameters on how to split a certain range using the '*paramsProvider'.* What do you think? Best, Michael. |
Hi Michael,
have you considered trying out the EsInputFormat [1] with Flink's HadoopInputFormatBase? That way reading from ElasticSearch might already work out of the box. If not, then adding a dedicated ElasticSearch input format would definitely be helpful. [1] https://github.com/elastic/elasticsearch-hadoop On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <[hidden email]> wrote: > Hi all, > > I have a workload where I need to read and transform large amounts of data > from Elasticsearch. I'm currently using Flink only for streaming but I > though that it can also be a good fit for this kind of batch job. > However, I did not find a way to load data from Elasticsearch in parallel > to Flink. > > I'd like to propose *ElasticsearchInputFormat* which will be able to load > data from Elasticsearch in parallel by leveraging the InputSplit mechanism > in Flink and the Elasticsearch scroll API. > > The API should look something like this: > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > .setParametersProvider(paramsProvider) > .setIndex("index-name") > .setClusterName("type-name") > .build(); > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); > > The '*query' *is a regular ES query specifying the data to fetch. > The '*esMapper*' maps JSON data returned from Elasticsearch to some object > (In the example above *MessageObj*) > In order for it to work in parallel the InputFormat will work with an > InputSplit which will get parameters on how to split a certain range using > the '*paramsProvider'.* > > What do you think? > > Best, > Michael. > |
Hi Till,
Thanks for the great suggestion! Seems like it does the job. Here is a sample of the code: public class FlinkMain { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); EsInputFormat<Text, LinkedMapWritable> kvEsInputFormat = new EsInputFormat<> (); HadoopInputFormat<Text, LinkedMapWritable> hadoopInputFormat = new HadoopInputFormat<>(kvEsInputFormat, Text.class, LinkedMapWritable.class); Configuration configuration = hadoopInputFormat.getConfiguration(); configuration.set("es.resource", "flink-1/flink_t"); configuration.set("es.query", "?q=*"); DataSet<Tuple2<Text, LinkedMapWritable>> input = env.createInput(hadoopInputFormat); List<Tuple2<Text, LinkedMapWritable>> collect = input.collect(); collect.forEach(e -> System.out.println(e)); } } On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann <[hidden email]> wrote: > Hi Michael, > > have you considered trying out the EsInputFormat [1] with > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might > already work out of the box. If not, then adding a dedicated ElasticSearch > input format would definitely be helpful. > > [1] https://github.com/elastic/elasticsearch-hadoop > > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <[hidden email]> > wrote: > > > Hi all, > > > > I have a workload where I need to read and transform large amounts of > data > > from Elasticsearch. I'm currently using Flink only for streaming but I > > though that it can also be a good fit for this kind of batch job. > > However, I did not find a way to load data from Elasticsearch in parallel > > to Flink. > > > > I'd like to propose *ElasticsearchInputFormat* which will be able to load > > data from Elasticsearch in parallel by leveraging the InputSplit > mechanism > > in Flink and the Elasticsearch scroll API. > > > > The API should look something like this: > > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > > .setParametersProvider(paramsProvider) > > .setIndex("index-name") > > .setClusterName("type-name") > > .build(); > > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); > > > > The '*query' *is a regular ES query specifying the data to fetch. > > The '*esMapper*' maps JSON data returned from Elasticsearch to some > object > > (In the example above *MessageObj*) > > In order for it to work in parallel the InputFormat will work with an > > InputSplit which will get parameters on how to split a certain range > using > > the '*paramsProvider'.* > > > > What do you think? > > > > Best, > > Michael. > > > |
Great to hear that it works :-)
On Tue, Sep 11, 2018 at 9:28 AM Michael Gendelman <[hidden email]> wrote: > Hi Till, > > Thanks for the great suggestion! > Seems like it does the job. Here is a sample of the code: > > public class FlinkMain { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > > EsInputFormat<Text, LinkedMapWritable> kvEsInputFormat = new > EsInputFormat<> (); > HadoopInputFormat<Text, LinkedMapWritable> hadoopInputFormat = > new HadoopInputFormat<>(kvEsInputFormat, Text.class, > LinkedMapWritable.class); > > Configuration configuration = hadoopInputFormat.getConfiguration(); > configuration.set("es.resource", "flink-1/flink_t"); > configuration.set("es.query", "?q=*"); > > DataSet<Tuple2<Text, LinkedMapWritable>> input = > env.createInput(hadoopInputFormat); > > List<Tuple2<Text, LinkedMapWritable>> collect = input.collect(); > collect.forEach(e -> System.out.println(e)); > } > } > > > On Mon, Sep 10, 2018 at 9:47 AM Till Rohrmann <[hidden email]> > wrote: > > > Hi Michael, > > > > have you considered trying out the EsInputFormat [1] with > > Flink's HadoopInputFormatBase? That way reading from ElasticSearch might > > already work out of the box. If not, then adding a dedicated > ElasticSearch > > input format would definitely be helpful. > > > > [1] https://github.com/elastic/elasticsearch-hadoop > > > > On Sat, Sep 8, 2018 at 11:48 PM Michael Gendelman <[hidden email]> > > wrote: > > > > > Hi all, > > > > > > I have a workload where I need to read and transform large amounts of > > data > > > from Elasticsearch. I'm currently using Flink only for streaming but I > > > though that it can also be a good fit for this kind of batch job. > > > However, I did not find a way to load data from Elasticsearch in > parallel > > > to Flink. > > > > > > I'd like to propose *ElasticsearchInputFormat* which will be able to > load > > > data from Elasticsearch in parallel by leveraging the InputSplit > > mechanism > > > in Flink and the Elasticsearch scroll API. > > > > > > The API should look something like this: > > > ElasticsearchInputFormat<MessageObj> elasticsearchInputFormat = > > > ElasticsearchInputFormat.builder(esHostList, query, esMapper, typeInfo) > > > .setParametersProvider(paramsProvider) > > > .setIndex("index-name") > > > .setClusterName("type-name") > > > .build(); > > > DataSet<MessageObj> input = env.createInput(elasticsearchInputFormat); > > > > > > The '*query' *is a regular ES query specifying the data to fetch. > > > The '*esMapper*' maps JSON data returned from Elasticsearch to some > > object > > > (In the example above *MessageObj*) > > > In order for it to work in parallel the InputFormat will work with an > > > InputSplit which will get parameters on how to split a certain range > > using > > > the '*paramsProvider'.* > > > > > > What do you think? > > > > > > Best, > > > Michael. > > > > > > |
Free forum by Nabble | Edit this page |