Flink Elasticsearch Sink Issue

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Elasticsearch Sink Issue

Ramya Ramamurthy
Hi,

We use Kafka->Flink->Elasticsearch in our project.
The data to the elasticsearch is not getting flushed, till the next batch
arrives.
E.g.: If the first batch contains 1000 packets, this gets pushed to the
Elastic, only after the next batch arrives [irrespective of reaching the
batch time limit].
Below are the sink configurations we use  currently.

esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
fails
esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
esSinkBuilder.setBulkFlushBackoff(true);

Sink code :
List<HttpHost> httpHosts = new ArrayList<>();
//httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));

ElasticsearchSink.Builder<Row> esSinkBuilder = new
ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Row>() {

private IndexRequest createIndexRequest(byte[] document, String indexDate) {

return new IndexRequest(esIndex + indexDate, esType)
.source(document, XContentType.JSON);

}

@Override
public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
requestIndexer) {
byte[] byteArray = serializationSchema.serialize(r);

ObjectMapper mapper = new ObjectMapper();
ObjectWriter writer = mapper.writer();

try {
JsonNode jsonNode = mapper.readTree(byteArray);

long tumbleStart = jsonNode.get("fseen").asLong();

ZonedDateTime utc =
Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);

byte[] document = writer.writeValueAsBytes(jsonNode);

requestIndexer.add(createIndexRequest(document, indexDate));
} catch (Exception e) {
System.out.println("In the error block");
}

}
}
);

Has anyone faced this issue? Any help would be appreciated !!

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch Sink Issue

miki haiat
Did you set some checkpoints  configuration?

On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote:

> Hi,
>
> We use Kafka->Flink->Elasticsearch in our project.
> The data to the elasticsearch is not getting flushed, till the next batch
> arrives.
> E.g.: If the first batch contains 1000 packets, this gets pushed to the
> Elastic, only after the next batch arrives [irrespective of reaching the
> batch time limit].
> Below are the sink configurations we use  currently.
>
> esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
> esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> fails
> esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> esSinkBuilder.setBulkFlushBackoff(true);
>
> Sink code :
> List<HttpHost> httpHosts = new ArrayList<>();
> //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>
> ElasticsearchSink.Builder<Row> esSinkBuilder = new
> ElasticsearchSink.Builder<>(
> httpHosts,
> new ElasticsearchSinkFunction<Row>() {
>
> private IndexRequest createIndexRequest(byte[] document, String indexDate)
> {
>
> return new IndexRequest(esIndex + indexDate, esType)
> .source(document, XContentType.JSON);
>
> }
>
> @Override
> public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> requestIndexer) {
> byte[] byteArray = serializationSchema.serialize(r);
>
> ObjectMapper mapper = new ObjectMapper();
> ObjectWriter writer = mapper.writer();
>
> try {
> JsonNode jsonNode = mapper.readTree(byteArray);
>
> long tumbleStart = jsonNode.get("fseen").asLong();
>
> ZonedDateTime utc =
> Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
>
> byte[] document = writer.writeValueAsBytes(jsonNode);
>
> requestIndexer.add(createIndexRequest(document, indexDate));
> } catch (Exception e) {
> System.out.println("In the error block");
> }
>
> }
> }
> );
>
> Has anyone faced this issue? Any help would be appreciated !!
>
> Thanks,
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch Sink Issue

Ramya Ramamurthy
Yes, we do maintain checkpoints
env.enableCheckpointing(300000);

But we assumed it is for Kafka consumer offsets. Not sure how this is
useful in this case? Can you pls. elaborate on this.

~Ramya.



On Fri, Jun 21, 2019 at 4:33 PM miki haiat <[hidden email]> wrote:

> Did you set some checkpoints  configuration?
>
> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote:
>
> > Hi,
> >
> > We use Kafka->Flink->Elasticsearch in our project.
> > The data to the elasticsearch is not getting flushed, till the next batch
> > arrives.
> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
> > Elastic, only after the next batch arrives [irrespective of reaching the
> > batch time limit].
> > Below are the sink configurations we use  currently.
> >
> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> > fails
> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> > esSinkBuilder.setBulkFlushBackoff(true);
> >
> > Sink code :
> > List<HttpHost> httpHosts = new ArrayList<>();
> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
> >
> > ElasticsearchSink.Builder<Row> esSinkBuilder = new
> > ElasticsearchSink.Builder<>(
> > httpHosts,
> > new ElasticsearchSinkFunction<Row>() {
> >
> > private IndexRequest createIndexRequest(byte[] document, String
> indexDate)
> > {
> >
> > return new IndexRequest(esIndex + indexDate, esType)
> > .source(document, XContentType.JSON);
> >
> > }
> >
> > @Override
> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> > requestIndexer) {
> > byte[] byteArray = serializationSchema.serialize(r);
> >
> > ObjectMapper mapper = new ObjectMapper();
> > ObjectWriter writer = mapper.writer();
> >
> > try {
> > JsonNode jsonNode = mapper.readTree(byteArray);
> >
> > long tumbleStart = jsonNode.get("fseen").asLong();
> >
> > ZonedDateTime utc =
> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
> >
> > byte[] document = writer.writeValueAsBytes(jsonNode);
> >
> > requestIndexer.add(createIndexRequest(document, indexDate));
> > } catch (Exception e) {
> > System.out.println("In the error block");
> > }
> >
> > }
> > }
> > );
> >
> > Has anyone faced this issue? Any help would be appreciated !!
> >
> > Thanks,
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Elasticsearch Sink Issue

Ramya Ramamurthy
By default, flushOnCheckpoint is set to True.
So ideally, based on env.enableCheckpointing(300000); ---- the flush to ES
must be triggered every 30seconds, though our ES Flush timeout is 60
seconds.
If the above assumption is correct, then still we do not see packets
getting flushed till the next packet/batch arrives.

Thanks.

On Fri, Jun 21, 2019 at 6:07 PM Ramya Ramamurthy <[hidden email]> wrote:

> Yes, we do maintain checkpoints
> env.enableCheckpointing(300000);
>
> But we assumed it is for Kafka consumer offsets. Not sure how this is
> useful in this case? Can you pls. elaborate on this.
>
> ~Ramya.
>
>
>
> On Fri, Jun 21, 2019 at 4:33 PM miki haiat <[hidden email]> wrote:
>
>> Did you set some checkpoints  configuration?
>>
>> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <[hidden email]> wrote:
>>
>> > Hi,
>> >
>> > We use Kafka->Flink->Elasticsearch in our project.
>> > The data to the elasticsearch is not getting flushed, till the next
>> batch
>> > arrives.
>> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
>> > Elastic, only after the next batch arrives [irrespective of reaching the
>> > batch time limit].
>> > Below are the sink configurations we use  currently.
>> >
>> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
>> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
>> > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
>> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if
>> bulk
>> > fails
>> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
>> > esSinkBuilder.setBulkFlushBackoff(true);
>> >
>> > Sink code :
>> > List<HttpHost> httpHosts = new ArrayList<>();
>> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
>> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>> >
>> > ElasticsearchSink.Builder<Row> esSinkBuilder = new
>> > ElasticsearchSink.Builder<>(
>> > httpHosts,
>> > new ElasticsearchSinkFunction<Row>() {
>> >
>> > private IndexRequest createIndexRequest(byte[] document, String
>> indexDate)
>> > {
>> >
>> > return new IndexRequest(esIndex + indexDate, esType)
>> > .source(document, XContentType.JSON);
>> >
>> > }
>> >
>> > @Override
>> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
>> > requestIndexer) {
>> > byte[] byteArray = serializationSchema.serialize(r);
>> >
>> > ObjectMapper mapper = new ObjectMapper();
>> > ObjectWriter writer = mapper.writer();
>> >
>> > try {
>> > JsonNode jsonNode = mapper.readTree(byteArray);
>> >
>> > long tumbleStart = jsonNode.get("fseen").asLong();
>> >
>> > ZonedDateTime utc =
>> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
>> > String indexDate =
>> DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
>> >
>> > byte[] document = writer.writeValueAsBytes(jsonNode);
>> >
>> > requestIndexer.add(createIndexRequest(document, indexDate));
>> > } catch (Exception e) {
>> > System.out.println("In the error block");
>> > }
>> >
>> > }
>> > }
>> > );
>> >
>> > Has anyone faced this issue? Any help would be appreciated !!
>> >
>> > Thanks,
>> >
>>
>