Rewriting a new file instead of writing a ".valid-length" file in BucketSink when restoring

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Rewriting a new file instead of writing a ".valid-length" file in BucketSink when restoring

Xinyu Zhang
Hi


I'm trying to copy data from kafka to HDFS . The data in HDFS is used to do other computations by others in map/reduce.
If some tasks failed, the ".valid-length" file is created for the low version hadoop. The problem is other people must know how to deal with the ".valid-length" file, otherwise, the data may be not exactly-once.
Hence, why not rewrite a new file when restoring instead of writing a ".valid-length" file. In this way, others who use the data in HDFS don't need to know how to deal with the ".valid-length" file.


Thanks!


Zhang Xinyu
Reply | Threaded
Open this post in threaded view
|

Re: Rewriting a new file instead of writing a ".valid-length" file in BucketSink when restoring

Gary Yao
Hi,

The BucketingSink truncates the file if the Hadoop FileSystem supports this
operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you using?

Best,
Gary

[1]
https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301

On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:

> Hi
>
>
> I'm trying to copy data from kafka to HDFS . The data in HDFS is used to
> do other computations by others in map/reduce.
> If some tasks failed, the ".valid-length" file is created for the low
> version hadoop. The problem is other people must know how to deal with the
> ".valid-length" file, otherwise, the data may be not exactly-once.
> Hence, why not rewrite a new file when restoring instead of writing a
> ".valid-length" file. In this way, others who use the data in HDFS don't
> need to know how to deal with the ".valid-length" file.
>
>
> Thanks!
>
>
> Zhang Xinyu
Reply | Threaded
Open this post in threaded view
|

Re: Rewriting a new file instead of writing a ".valid-length" file in BucketSink when restoring

Timo Walther-2
I guess writing a new file would take much longer than just using the
.valid-length file, especially if the files are very large. The
restoring time should be as minimal as possible to ensure little
downtime on restarts.

Regards,
Timo


Am 15.05.18 um 09:31 schrieb Gary Yao:

> Hi,
>
> The BucketingSink truncates the file if the Hadoop FileSystem supports this
> operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you using?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301
>
> On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
>
>> Hi
>>
>>
>> I'm trying to copy data from kafka to HDFS . The data in HDFS is used to
>> do other computations by others in map/reduce.
>> If some tasks failed, the ".valid-length" file is created for the low
>> version hadoop. The problem is other people must know how to deal with the
>> ".valid-length" file, otherwise, the data may be not exactly-once.
>> Hence, why not rewrite a new file when restoring instead of writing a
>> ".valid-length" file. In this way, others who use the data in HDFS don't
>> need to know how to deal with the ".valid-length" file.
>>
>>
>> Thanks!
>>
>>
>> Zhang Xinyu


Reply | Threaded
Open this post in threaded view
|

回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Xinyu Zhang
Thanks for your reply.
Indeed, if a file is very large, it will take a long time. However, the the “.valid-length” file is not convenient for others who use the data in HDFS.
Maybe we should provide a configuration for users to choose which strategy they prefer.
Do you have any ideas?




------------------ 原始邮件 ------------------
发件人: "Timo Walther"<[hidden email]>;
发送时间: 2018年5月15日(星期二) 晚上7:30
收件人: "dev"<[hidden email]>;

主题: Re: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring



I guess writing a new file would take much longer than just using the
.valid-length file, especially if the files are very large. The
restoring time should be as minimal as possible to ensure little
downtime on restarts.

Regards,
Timo


Am 15.05.18 um 09:31 schrieb Gary Yao:

> Hi,
>
> The BucketingSink truncates the file if the Hadoop FileSystem supports this
> operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you using?
>
> Best,
> Gary
>
> [1]
> https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301
>
> On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
>
>> Hi
>>
>>
>> I'm trying to copy data from kafka to HDFS . The data in HDFS is used to
>> do other computations by others in map/reduce.
>> If some tasks failed, the ".valid-length" file is created for the low
>> version hadoop. The problem is other people must know how to deal with the
>> ".valid-length" file, otherwise, the data may be not exactly-once.
>> Hence, why not rewrite a new file when restoring instead of writing a
>> ".valid-length" file. In this way, others who use the data in HDFS don't
>> need to know how to deal with the ".valid-length" file.
>>
>>
>> Thanks!
>>
>>
>> Zhang Xinyu
Reply | Threaded
Open this post in threaded view
|

Re: Rewriting a new file instead of writing a ".valid-length" file in BucketSink when restoring

Amit Jain
In reply to this post by Timo Walther-2
Hi Timo,

Should not we delegate the recovery option to the user?
I think we can ask the user to provide Reader to respective Writer
class and save valid-length info in operator state apart from the
current flow. According to user chosen recovery option, we can stream
the Reader output to Writer class till the valid-length limit.

We see there would be an issue in eventual consistent storage like S3
as its consumer can read stale data on the intermittent basis or
duplicate data if the correct filename is not chosen.
--
Thanks,
Amit

On Tue, May 15, 2018 at 5:00 PM, Timo Walther <[hidden email]> wrote:

> I guess writing a new file would take much longer than just using the
> .valid-length file, especially if the files are very large. The restoring
> time should be as minimal as possible to ensure little downtime on restarts.
>
> Regards,
> Timo
>
>
> Am 15.05.18 um 09:31 schrieb Gary Yao:
>
>> Hi,
>>
>> The BucketingSink truncates the file if the Hadoop FileSystem supports
>> this
>> operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you
>> using?
>>
>> Best,
>> Gary
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301
>>
>> On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
>>
>>> Hi
>>>
>>>
>>> I'm trying to copy data from kafka to HDFS . The data in HDFS is used to
>>> do other computations by others in map/reduce.
>>> If some tasks failed, the ".valid-length" file is created for the low
>>> version hadoop. The problem is other people must know how to deal with
>>> the
>>> ".valid-length" file, otherwise, the data may be not exactly-once.
>>> Hence, why not rewrite a new file when restoring instead of writing a
>>> ".valid-length" file. In this way, others who use the data in HDFS don't
>>> need to know how to deal with the ".valid-length" file.
>>>
>>>
>>> Thanks!
>>>
>>>
>>> Zhang Xinyu
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Timo Walther-2
In reply to this post by Xinyu Zhang
As far as I know, the bucketing sink is currenlty also limited by
relying on Hadoops file system abstraction. It is planned to switch to
Flink's file system abstraction which might also improve this situation.
Kostas (in CC) might know more about it.

But I think we can discuss if an other behavior should be configurable
as well. Would you be willing to contribute?

Regards,
Timo


Am 15.05.18 um 14:01 schrieb Xinyu Zhang:

> Thanks for your reply.
> Indeed, if a file is very large, it will take a long time. However,
> the the “.valid-length” file is not convenient for others who use the
> data in HDFS.
> Maybe we should provide a configuration for users to choose which
> strategy they prefer.
> Do you have any ideas?
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Timo Walther"<[hidden email]>;
> *发送时间:* 2018年5月15日(星期二) 晚上7:30
> *收件人:* "dev"<[hidden email]>;
> *主题:* Re: Rewriting a new file instead of writing a ".valid-length"
> file inBucketSink when restoring
>
> I guess writing a new file would take much longer than just using the
> .valid-length file, especially if the files are very large. The
> restoring time should be as minimal as possible to ensure little
> downtime on restarts.
>
> Regards,
> Timo
>
>
> Am 15.05.18 um 09:31 schrieb Gary Yao:
> > Hi,
> >
> > The BucketingSink truncates the file if the Hadoop FileSystem
> supports this
> > operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you
> using?
> >
> > Best,
> > Gary
> >
> > [1]
> >
> https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301
> >
> > On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
> >
> >> Hi
> >>
> >>
> >> I'm trying to copy data from kafka to HDFS . The data in HDFS is
> used to
> >> do other computations by others in map/reduce.
> >> If some tasks failed, the ".valid-length" file is created for the low
> >> version hadoop. The problem is other people must know how to deal
> with the
> >> ".valid-length" file, otherwise, the data may be not exactly-once.
> >> Hence, why not rewrite a new file when restoring instead of writing a
> >> ".valid-length" file. In this way, others who use the data in HDFS
> don't
> >> need to know how to deal with the ".valid-length" file.
> >>
> >>
> >> Thanks!
> >>
> >>
> >> Zhang Xinyu
>

Reply | Threaded
Open this post in threaded view
|

回复: 回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Xinyu Zhang
Yes, I'm glad to do it. but I'm not sure writing a new file is a good solution. So I want to discuss it here.
Do you have any ideas? @Kostas




------------------ 原始邮件 ------------------
发件人: "twalthr"<[hidden email]>;
发送时间: 2018年5月15日(星期二) 晚上8:21
收件人: "Xinyu Zhang"<[hidden email]>;
抄送: "dev"<[hidden email]>; "kkloudas"<[hidden email]>;
主题: Re: 回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring



As far as I know, the bucketing sink is currenlty also limited by
relying on Hadoops file system abstraction. It is planned to switch to
Flink's file system abstraction which might also improve this situation.
Kostas (in CC) might know more about it.

But I think we can discuss if an other behavior should be configurable
as well. Would you be willing to contribute?

Regards,
Timo


Am 15.05.18 um 14:01 schrieb Xinyu Zhang:

> Thanks for your reply.
> Indeed, if a file is very large, it will take a long time. However,
> the the “.valid-length” file is not convenient for others who use the
> data in HDFS.
> Maybe we should provide a configuration for users to choose which
> strategy they prefer.
> Do you have any ideas?
>
>
> ------------------ 原始邮件 ------------------
> *发件人:* "Timo Walther"<[hidden email]>;
> *发送时间:* 2018年5月15日(星期二) 晚上7:30
> *收件人:* "dev"<[hidden email]>;
> *主题:* Re: Rewriting a new file instead of writing a ".valid-length"
> file inBucketSink when restoring
>
> I guess writing a new file would take much longer than just using the
> .valid-length file, especially if the files are very large. The
> restoring time should be as minimal as possible to ensure little
> downtime on restarts.
>
> Regards,
> Timo
>
>
> Am 15.05.18 um 09:31 schrieb Gary Yao:
> > Hi,
> >
> > The BucketingSink truncates the file if the Hadoop FileSystem
> supports this
> > operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you
> using?
> >
> > Best,
> > Gary
> >
> > [1]
> >
> https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301
> >
> > On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
> >
> >> Hi
> >>
> >>
> >> I'm trying to copy data from kafka to HDFS . The data in HDFS is
> used to
> >> do other computations by others in map/reduce.
> >> If some tasks failed, the ".valid-length" file is created for the low
> >> version hadoop. The problem is other people must know how to deal
> with the
> >> ".valid-length" file, otherwise, the data may be not exactly-once.
> >> Hence, why not rewrite a new file when restoring instead of writing a
> >> ".valid-length" file. In this way, others who use the data in HDFS
> don't
> >> need to know how to deal with the ".valid-length" file.
> >>
> >>
> >> Thanks!
> >>
> >>
> >> Zhang Xinyu
>
Reply | Threaded
Open this post in threaded view
|

Re: 回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Till Rohrmann
Hi Xinyu,

would it help to have a small tool which can truncate the finished files
which have a valid-length file associated? That way, one could use this
tool before others are using the data farther down stream.

Cheers,
Till

On Tue, May 15, 2018 at 3:05 PM, Xinyu Zhang <[hidden email]> wrote:

> Yes, I'm glad to do it. but I'm not sure writing a new file is a good
> solution. So I want to discuss it here.
> Do you have any ideas? @Kostas
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "twalthr"<[hidden email]>;
> 发送时间: 2018年5月15日(星期二) 晚上8:21
> 收件人: "Xinyu Zhang"<[hidden email]>;
> 抄送: "dev"<[hidden email]>; "kkloudas"<[hidden email]>;
> 主题: Re: 回复: Rewriting a new file instead of writing a ".valid-length" file
> inBucketSink when restoring
>
>
>
> As far as I know, the bucketing sink is currenlty also limited by
> relying on Hadoops file system abstraction. It is planned to switch to
> Flink's file system abstraction which might also improve this situation.
> Kostas (in CC) might know more about it.
>
> But I think we can discuss if an other behavior should be configurable
> as well. Would you be willing to contribute?
>
> Regards,
> Timo
>
>
> Am 15.05.18 um 14:01 schrieb Xinyu Zhang:
> > Thanks for your reply.
> > Indeed, if a file is very large, it will take a long time. However,
> > the the “.valid-length” file is not convenient for others who use the
> > data in HDFS.
> > Maybe we should provide a configuration for users to choose which
> > strategy they prefer.
> > Do you have any ideas?
> >
> >
> > ------------------ 原始邮件 ------------------
> > *发件人:* "Timo Walther"<[hidden email]>;
> > *发送时间:* 2018年5月15日(星期二) 晚上7:30
> > *收件人:* "dev"<[hidden email]>;
> > *主题:* Re: Rewriting a new file instead of writing a ".valid-length"
> > file inBucketSink when restoring
> >
> > I guess writing a new file would take much longer than just using the
> > .valid-length file, especially if the files are very large. The
> > restoring time should be as minimal as possible to ensure little
> > downtime on restarts.
> >
> > Regards,
> > Timo
> >
> >
> > Am 15.05.18 um 09:31 schrieb Gary Yao:
> > > Hi,
> > >
> > > The BucketingSink truncates the file if the Hadoop FileSystem
> > supports this
> > > operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you
> > using?
> > >
> > > Best,
> > > Gary
> > >
> > > [1]
> > >
> > https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196
> b2fdaf85e0/flink-connectors/flink-connector-filesystem/
> src/main/java/org/apache/flink/streaming/connectors/fs/
> bucketing/BucketingSink.java#L301
> > >
> > > On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
> > >
> > >> Hi
> > >>
> > >>
> > >> I'm trying to copy data from kafka to HDFS . The data in HDFS is
> > used to
> > >> do other computations by others in map/reduce.
> > >> If some tasks failed, the ".valid-length" file is created for the low
> > >> version hadoop. The problem is other people must know how to deal
> > with the
> > >> ".valid-length" file, otherwise, the data may be not exactly-once.
> > >> Hence, why not rewrite a new file when restoring instead of writing a
> > >> ".valid-length" file. In this way, others who use the data in HDFS
> > don't
> > >> need to know how to deal with the ".valid-length" file.
> > >>
> > >>
> > >> Thanks!
> > >>
> > >>
> > >> Zhang Xinyu
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Xinyu Zhang-2
Hi Till


Thanks for your suggestion. A small tool can work lightly and asynchronously.
However, I don't know when others will use the data, so I should use the tool to check and truncate the finished file once a valid-length file is found. I think it's hard to maintain it and it shouldn't be maintained by users (just like the current implementation of BucketingSink with truncate function).


Regards,
Zhang Xinyu


------------------ 原始邮件 ------------------
发件人: "Till Rohrmann"<[hidden email]>;
发送时间: 2018年5月15日(星期二) 晚上11:27
收件人: "dev"<[hidden email]>;
抄送: "kkloudas"<[hidden email]>;
主题: Re: 回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring



Hi Xinyu,

would it help to have a small tool which can truncate the finished files
which have a valid-length file associated? That way, one could use this
tool before others are using the data farther down stream.

Cheers,
Till

On Tue, May 15, 2018 at 3:05 PM, Xinyu Zhang <[hidden email]> wrote:

> Yes, I'm glad to do it. but I'm not sure writing a new file is a good
> solution. So I want to discuss it here.
> Do you have any ideas? @Kostas
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "twalthr"<[hidden email]>;
> 发送时间: 2018年5月15日(星期二) 晚上8:21
> 收件人: "Xinyu Zhang"<[hidden email]>;
> 抄送: "dev"<[hidden email]>; "kkloudas"<[hidden email]>;
> 主题: Re: 回复: Rewriting a new file instead of writing a ".valid-length" file
> inBucketSink when restoring
>
>
>
> As far as I know, the bucketing sink is currenlty also limited by
> relying on Hadoops file system abstraction. It is planned to switch to
> Flink's file system abstraction which might also improve this situation.
> Kostas (in CC) might know more about it.
>
> But I think we can discuss if an other behavior should be configurable
> as well. Would you be willing to contribute?
>
> Regards,
> Timo
>
>
> Am 15.05.18 um 14:01 schrieb Xinyu Zhang:
> > Thanks for your reply.
> > Indeed, if a file is very large, it will take a long time. However,
> > the the “.valid-length” file is not convenient for others who use the
> > data in HDFS.
> > Maybe we should provide a configuration for users to choose which
> > strategy they prefer.
> > Do you have any ideas?
> >
> >
> > ------------------ 原始邮件 ------------------
> > *发件人:* "Timo Walther"<[hidden email]>;
> > *发送时间:* 2018年5月15日(星期二) 晚上7:30
> > *收件人:* "dev"<[hidden email]>;
> > *主题:* Re: Rewriting a new file instead of writing a ".valid-length"
> > file inBucketSink when restoring
> >
> > I guess writing a new file would take much longer than just using the
> > .valid-length file, especially if the files are very large. The
> > restoring time should be as minimal as possible to ensure little
> > downtime on restarts.
> >
> > Regards,
> > Timo
> >
> >
> > Am 15.05.18 um 09:31 schrieb Gary Yao:
> > > Hi,
> > >
> > > The BucketingSink truncates the file if the Hadoop FileSystem
> > supports this
> > > operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you
> > using?
> > >
> > > Best,
> > > Gary
> > >
> > > [1]
> > >
> > https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196
> b2fdaf85e0/flink-connectors/flink-connector-filesystem/
> src/main/java/org/apache/flink/streaming/connectors/fs/
> bucketing/BucketingSink.java#L301
> > >
> > > On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:
> > >
> > >> Hi
> > >>
> > >>
> > >> I'm trying to copy data from kafka to HDFS . The data in HDFS is
> > used to
> > >> do other computations by others in map/reduce.
> > >> If some tasks failed, the ".valid-length" file is created for the low
> > >> version hadoop. The problem is other people must know how to deal
> > with the
> > >> ".valid-length" file, otherwise, the data may be not exactly-once.
> > >> Hence, why not rewrite a new file when restoring instead of writing a
> > >> ".valid-length" file. In this way, others who use the data in HDFS
> > don't
> > >> need to know how to deal with the ".valid-length" file.
> > >>
> > >>
> > >> Thanks!
> > >>
> > >>
> > >> Zhang Xinyu
> >
>
Reply | Threaded
Open this post in threaded view
|

回复: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring

Xinyu Zhang
In reply to this post by Gary Yao
Hi gary


Our Hadoop version is very low and does not support truncate function. Some people are working on upgrading it.
I just think the ".valid-length" file is not friendly enough for users. Maybe it should be configurable to choose whether using the ".valid-length" file at least.


Regards,
Xinyu Zhang


------------------ 原始邮件 ------------------
发件人: "Gary Yao"<[hidden email]>;
发送时间: 2018年5月15日(星期二) 下午3:31
收件人: "dev"<[hidden email]>;
抄送: "Xinyu Zhang"<[hidden email]>;
主题: Re: Rewriting a new file instead of writing a ".valid-length" file inBucketSink when restoring



Hi,

The BucketingSink truncates the file if the Hadoop FileSystem supports this
operation (Hadoop 2.7 and above) [1]. What version of Hadoop are you using?

Best,
Gary

[1]
https://github.com/apache/flink/blob/bcd028d75b0e5c5c691e24640a2196b2fdaf85e0/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L301

On Mon, May 14, 2018 at 1:37 PM, 张馨予 <[hidden email]> wrote:

> Hi
>
>
> I'm trying to copy data from kafka to HDFS . The data in HDFS is used to
> do other computations by others in map/reduce.
> If some tasks failed, the ".valid-length" file is created for the low
> version hadoop. The problem is other people must know how to deal with the
> ".valid-length" file, otherwise, the data may be not exactly-once.
> Hence, why not rewrite a new file when restoring instead of writing a
> ".valid-length" file. In this way, others who use the data in HDFS don't
> need to know how to deal with the ".valid-length" file.
>
>
> Thanks!
>
>
> Zhang Xinyu