Starting here the discussion after an initial discussion with Ververica and AWS teams during FlinkForward.
I'm investigating the performances of a Flink job that transports data from Kafka to an S3 Sink. We are using a BucketingSink to write parquet files. The bucketing logic divides the messages having a folder per type of data, tenant (customer), date-time, extraction Id, etc etc. This results in each file is stored in a folder structure composed by 9-10 layers (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...) If the data is distributed as bursts of messages for tenant-type we see good performances in writing, but when the data is more a white noise distribution on thousands of tenants, dozens of data types and multiple extraction IDs, we have an incredible loss of performances. (in the order of 300x times) Attaching a debugger, it seems the issue is connected to the number of handlers open at the same time on S3 to write data. More specifically https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png Researching in the hadoop libraries used to write to S3 I have found some possible improvements setting: <name>fs.s3a.connection.maximum</name> <name>fs.s3a.threads.max</name> <name>fs.s3a.threads.core</name> <name>fs.s3a.max.total.tasks</name> But none of these made a big difference in throughput. I hope to bring ahead the discussion and see if we can find a clear issue in the logic or possible work-around. Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem (BucketingSink) |
Hi Enrico,
Thanks for opening the discussion! One thing to note that may help s that the hadoop S3 FS tries to imitate a filesystem on top of S3: - before writing a key it checks if the "parent directory" exists by checking for a key with the prefix up to the last "/" - it creates empty marker files to mark the existence of such a parent directory - all these "existence" requests are S3 HEAD requests which are expensive. As a result the hadoop S3 FS has very high "create file" latency and it hits request rate limits very quickly. This may not play well with your directory structure. The fact that it works well when messages for a specific dir come in bursts may be explained by some form of internal caching done by hadoop but I am not sure. In any case, it may be also helpful to post your findings to the hadoop community as well to see if they have any answers. Finally, two recommendations: 1) try using the presto-S3 instead of hadoop, as presto seems to be more efficient on that regard, and please report if you notice any changes 2) try to move to the StreamingFileSink as the BucketingSink is already deprecated (although presto-s3 is not supported by the StreamingFileSink) Cheers, Kostas On Tue, Oct 15, 2019 at 3:47 PM Enrico Agnoli <[hidden email]> wrote: > > Starting here the discussion after an initial discussion with Ververica and AWS teams during FlinkForward. > I'm investigating the performances of a Flink job that transports data from Kafka to an S3 Sink. > We are using a BucketingSink to write parquet files. The bucketing logic divides the messages having a folder per type of data, tenant (customer), date-time, extraction Id, etc etc. This results in each file is stored in a folder structure composed by 9-10 layers (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...) > > If the data is distributed as bursts of messages for tenant-type we see good performances in writing, but when the data is more a white noise distribution on thousands of tenants, dozens of data types and multiple extraction IDs, we have an incredible loss of performances. (in the order of 300x times) > > Attaching a debugger, it seems the issue is connected to the number of handlers open at the same time on S3 to write data. More specifically > https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png > > Researching in the hadoop libraries used to write to S3 I have found some possible improvements setting: > <name>fs.s3a.connection.maximum</name> > <name>fs.s3a.threads.max</name> > <name>fs.s3a.threads.core</name> > <name>fs.s3a.max.total.tasks</name> > But none of these made a big difference in throughput. > > I hope to bring ahead the discussion and see if we can find a clear issue in the logic or possible work-around. > > Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem (BucketingSink) > |
I finally found the time to dig a little more on this and found the real problem.
The culprit of the slow-down is this piece of code: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551 This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs from an instrumented call: 2020-02-07 08:51:05,825 INFO BucketingSink - openNewPartFile FS verification 2020-02-07 08:51:09,906 INFO BucketingSink - openNewPartFile FS verification - done 2020-02-07 08:51:11,181 INFO BucketingSink - openNewPartFile FS - completed partPath = s3a://.... This together with the default setup of the bucketing sink with 60 secs inactivity rollover https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 means that with more than 10 parallel bucket on a slot by the time we finish creating the last bucket the first one became stale, so needs to be rotated generating a blocking situation. We solved this by deleting the FS check mentioned above (now the file opening takes ~1.2sec) and set the default inactive threshold to 5 mins. With this changes we can easily handle more than 200 buckets per slot (once the job takes speed it will ingest on all the slots so postponing the inactive timeout) -Enrico |
Hi Enrico,
Nice to hear from you and thanks for checking it out! This can be helpful for people using the BucketingSink but I would recommend you to switch to the StreamingFileSink which is the "new version" of the BucketingSink. In fact the BucketingSink is going to be removed in one of the following releases, as it is deprecated for quite a while. If you try the StreamingFileSink, let us know if the problem persists. Cheers, Kostas On Fri, Feb 7, 2020 at 11:20 AM Enrico Agnoli <[hidden email]> wrote: > > I finally found the time to dig a little more on this and found the real problem. > The culprit of the slow-down is this piece of code: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551 > > This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs from an instrumented call: > 2020-02-07 08:51:05,825 INFO BucketingSink - openNewPartFile FS verification > 2020-02-07 08:51:09,906 INFO BucketingSink - openNewPartFile FS verification - done > 2020-02-07 08:51:11,181 INFO BucketingSink - openNewPartFile FS - completed partPath = s3a://.... > > This together with the default setup of the bucketing sink with 60 secs inactivity rollover > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 > means that with more than 10 parallel bucket on a slot by the time we finish creating the last bucket the first one became stale, so needs to be rotated generating a blocking situation. > > We solved this by deleting the FS check mentioned above (now the file opening takes ~1.2sec) and set the default inactive threshold to 5 mins. With this changes we can easily handle more than 200 buckets per slot (once the job takes speed it will ingest on all the slots so postponing the inactive timeout) > > -Enrico |
Free forum by Nabble | Edit this page |