Hi,
We use Kafka->Flink->Elasticsearch in our project. The data to the elasticsearch is not getting flushed, till the next batch arrives. E.g.: If the first batch contains 1000 packets, this gets pushed to the Elastic, only after the next batch arrives [irrespective of reaching the batch time limit]. Below are the sink configurations we use currently. esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk fails esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds esSinkBuilder.setBulkFlushBackoff(true); Sink code : List<HttpHost> httpHosts = new ArrayList<>(); //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<Row>() { private IndexRequest createIndexRequest(byte[] document, String indexDate) { return new IndexRequest(esIndex + indexDate, esType) .source(document, XContentType.JSON); } @Override public void process(Row r, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { byte[] byteArray = serializationSchema.serialize(r); ObjectMapper mapper = new ObjectMapper(); ObjectWriter writer = mapper.writer(); try { JsonNode jsonNode = mapper.readTree(byteArray); long tumbleStart = jsonNode.get("fseen").asLong(); ZonedDateTime utc = Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); byte[] document = writer.writeValueAsBytes(jsonNode); requestIndexer.add(createIndexRequest(document, indexDate)); } catch (Exception e) { System.out.println("In the error block"); } } } ); Has anyone faced this issue? Any help would be appreciated !! Thanks, |
Did you set some checkpoints configuration?
On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote: > Hi, > > We use Kafka->Flink->Elasticsearch in our project. > The data to the elasticsearch is not getting flushed, till the next batch > arrives. > E.g.: If the first batch contains 1000 packets, this gets pushed to the > Elastic, only after the next batch arrives [irrespective of reaching the > batch time limit]. > Below are the sink configurations we use currently. > > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk > fails > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds > esSinkBuilder.setBulkFlushBackoff(true); > > Sink code : > List<HttpHost> httpHosts = new ArrayList<>(); > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); > > ElasticsearchSink.Builder<Row> esSinkBuilder = new > ElasticsearchSink.Builder<>( > httpHosts, > new ElasticsearchSinkFunction<Row>() { > > private IndexRequest createIndexRequest(byte[] document, String indexDate) > { > > return new IndexRequest(esIndex + indexDate, esType) > .source(document, XContentType.JSON); > > } > > @Override > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer > requestIndexer) { > byte[] byteArray = serializationSchema.serialize(r); > > ObjectMapper mapper = new ObjectMapper(); > ObjectWriter writer = mapper.writer(); > > try { > JsonNode jsonNode = mapper.readTree(byteArray); > > long tumbleStart = jsonNode.get("fseen").asLong(); > > ZonedDateTime utc = > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); > > byte[] document = writer.writeValueAsBytes(jsonNode); > > requestIndexer.add(createIndexRequest(document, indexDate)); > } catch (Exception e) { > System.out.println("In the error block"); > } > > } > } > ); > > Has anyone faced this issue? Any help would be appreciated !! > > Thanks, > |
Yes, we do maintain checkpoints
env.enableCheckpointing(300000); But we assumed it is for Kafka consumer offsets. Not sure how this is useful in this case? Can you pls. elaborate on this. ~Ramya. On Fri, Jun 21, 2019 at 4:33 PM miki haiat <[hidden email]> wrote: > Did you set some checkpoints configuration? > > On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote: > > > Hi, > > > > We use Kafka->Flink->Elasticsearch in our project. > > The data to the elasticsearch is not getting flushed, till the next batch > > arrives. > > E.g.: If the first batch contains 1000 packets, this gets pushed to the > > Elastic, only after the next batch arrives [irrespective of reaching the > > batch time limit]. > > Below are the sink configurations we use currently. > > > > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records > > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once > > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once > > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk > > fails > > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds > > esSinkBuilder.setBulkFlushBackoff(true); > > > > Sink code : > > List<HttpHost> httpHosts = new ArrayList<>(); > > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); > > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); > > > > ElasticsearchSink.Builder<Row> esSinkBuilder = new > > ElasticsearchSink.Builder<>( > > httpHosts, > > new ElasticsearchSinkFunction<Row>() { > > > > private IndexRequest createIndexRequest(byte[] document, String > indexDate) > > { > > > > return new IndexRequest(esIndex + indexDate, esType) > > .source(document, XContentType.JSON); > > > > } > > > > @Override > > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer > > requestIndexer) { > > byte[] byteArray = serializationSchema.serialize(r); > > > > ObjectMapper mapper = new ObjectMapper(); > > ObjectWriter writer = mapper.writer(); > > > > try { > > JsonNode jsonNode = mapper.readTree(byteArray); > > > > long tumbleStart = jsonNode.get("fseen").asLong(); > > > > ZonedDateTime utc = > > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); > > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); > > > > byte[] document = writer.writeValueAsBytes(jsonNode); > > > > requestIndexer.add(createIndexRequest(document, indexDate)); > > } catch (Exception e) { > > System.out.println("In the error block"); > > } > > > > } > > } > > ); > > > > Has anyone faced this issue? Any help would be appreciated !! > > > > Thanks, > > > |
By default, flushOnCheckpoint is set to True.
So ideally, based on env.enableCheckpointing(300000); ---- the flush to ES must be triggered every 30seconds, though our ES Flush timeout is 60 seconds. If the above assumption is correct, then still we do not see packets getting flushed till the next packet/batch arrives. Thanks. On Fri, Jun 21, 2019 at 6:07 PM Ramya Ramamurthy <[hidden email]> wrote: > Yes, we do maintain checkpoints > env.enableCheckpointing(300000); > > But we assumed it is for Kafka consumer offsets. Not sure how this is > useful in this case? Can you pls. elaborate on this. > > ~Ramya. > > > > On Fri, Jun 21, 2019 at 4:33 PM miki haiat <[hidden email]> wrote: > >> Did you set some checkpoints configuration? >> >> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote: >> >> > Hi, >> > >> > We use Kafka->Flink->Elasticsearch in our project. >> > The data to the elasticsearch is not getting flushed, till the next >> batch >> > arrives. >> > E.g.: If the first batch contains 1000 packets, this gets pushed to the >> > Elastic, only after the next batch arrives [irrespective of reaching the >> > batch time limit]. >> > Below are the sink configurations we use currently. >> > >> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records >> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once >> > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once >> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if >> bulk >> > fails >> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds >> > esSinkBuilder.setBulkFlushBackoff(true); >> > >> > Sink code : >> > List<HttpHost> httpHosts = new ArrayList<>(); >> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); >> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); >> > >> > ElasticsearchSink.Builder<Row> esSinkBuilder = new >> > ElasticsearchSink.Builder<>( >> > httpHosts, >> > new ElasticsearchSinkFunction<Row>() { >> > >> > private IndexRequest createIndexRequest(byte[] document, String >> indexDate) >> > { >> > >> > return new IndexRequest(esIndex + indexDate, esType) >> > .source(document, XContentType.JSON); >> > >> > } >> > >> > @Override >> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer >> > requestIndexer) { >> > byte[] byteArray = serializationSchema.serialize(r); >> > >> > ObjectMapper mapper = new ObjectMapper(); >> > ObjectWriter writer = mapper.writer(); >> > >> > try { >> > JsonNode jsonNode = mapper.readTree(byteArray); >> > >> > long tumbleStart = jsonNode.get("fseen").asLong(); >> > >> > ZonedDateTime utc = >> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); >> > String indexDate = >> DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); >> > >> > byte[] document = writer.writeValueAsBytes(jsonNode); >> > >> > requestIndexer.add(createIndexRequest(document, indexDate)); >> > } catch (Exception e) { >> > System.out.println("In the error block"); >> > } >> > >> > } >> > } >> > ); >> > >> > Has anyone faced this issue? Any help would be appreciated !! >> > >> > Thanks, >> > >> > |
Free forum by Nabble | Edit this page |