HadoopOutputFormat has issues with LocalExecutionEnvironment?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

HadoopOutputFormat has issues with LocalExecutionEnvironment?

Ken Krugler
Hi devs,

In HadoopOutputFormat.close(), I see code that is trying to rename <outputPath>/tmp-r-00001 to be <outputPath>/1

But when I run my Flink 1.9.2 code using a local MiniCluster, the actual location of the tmp-r-00001 file is:

<outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001

I think this is because the default behavior of Hadoop’s FileOutputCommitter (with algorithm == 1) is to put files in task-specific sub-dirs.

It’s depending on a post-completion “merge paths” action to be taken by what is (for Hadoop) the Application Master.

I assume that when running on a real cluster, the HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do this, but it doesn’t seem to be happening when I run locally.

If I set the algorithm version to 2, then “merge paths” is handled by FileOutputCommitter immediately, and the HadoopOutputFormat code finds files in the expected location.

Wondering if Flink should always be using version 2 of the algorithm, as that’s more performant when there are a lot of results (which is why it was added).

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply | Threaded
Open this post in threaded view
|

Re: HadoopOutputFormat has issues with LocalExecutionEnvironment?

Robert Metzger
Hi Ken,

sorry for the late reply. This could be a bug in Flink. Does the issue also
occur on Flink 1.11?
Have you set a breakpoint in the HadoopOutputFormat.finalizeGlobal() when
running locally to validate that this method doesn't get called?

What do you mean by "algorithm version 2"? Where can you set this? (Sorry
for the question, I'm not an expert with Hadoop's FileOutputCommitter)

Note to others: There's a related discussion here:
https://issues.apache.org/jira/browse/FLINK-19069

Best,
Robert


On Wed, Aug 26, 2020 at 1:10 AM Ken Krugler <[hidden email]>
wrote:

> Hi devs,
>
> In HadoopOutputFormat.close(), I see code that is trying to rename
> <outputPath>/tmp-r-00001 to be <outputPath>/1
>
> But when I run my Flink 1.9.2 code using a local MiniCluster, the actual
> location of the tmp-r-00001 file is:
>
> <outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001
>
> I think this is because the default behavior of Hadoop’s
> FileOutputCommitter (with algorithm == 1) is to put files in task-specific
> sub-dirs.
>
> It’s depending on a post-completion “merge paths” action to be taken by
> what is (for Hadoop) the Application Master.
>
> I assume that when running on a real cluster, the
> HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do
> this, but it doesn’t seem to be happening when I run locally.
>
> If I set the algorithm version to 2, then “merge paths” is handled by
> FileOutputCommitter immediately, and the HadoopOutputFormat code finds
> files in the expected location.
>
> Wondering if Flink should always be using version 2 of the algorithm, as
> that’s more performant when there are a lot of results (which is why it was
> added).
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
Reply | Threaded
Open this post in threaded view
|

Re: HadoopOutputFormat has issues with LocalExecutionEnvironment?

Ken Krugler
Hi Robert,

I haven’t tried yet with 1.11, on my list.

I’ll be spending time on this tomorrow, so hopefully more results.

As for setting the algorithm version 2, I do it in code like this:

        Job job = Job.getInstance();
        job.getConfiguration().set("io.serializations", "cascading.tuple.hadoop.TupleSerialization");
        job.setOutputKeyClass(Tuple.class);
        job.setOutputValueClass(Tuple.class);

        // So that the FileOutputCommitter used by HadoopOutputFormat will put the resulting file(s)
        // at the top level of the directory, so Flink's rename logic will work properly, without
        // needing to have FinalizeOnMaster support.
        job.getConfiguration().setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 2);
       
        SequenceFileOutputFormat.setOutputPath(job, new Path(_parentDir, bucket));

        HadoopOutputFormat<Tuple, Tuple> result = new HadoopOutputFormat<Tuple, Tuple>(
                new SequenceFileOutputFormat<Tuple, Tuple>(), job);

See also https://issues.apache.org/jira/browse/MAPREDUCE-4815 <https://issues.apache.org/jira/browse/MAPREDUCE-4815>

— Ken

> On Sep 2, 2020, at 11:43 PM, Robert Metzger <[hidden email]> wrote:
>
> Hi Ken,
>
> sorry for the late reply. This could be a bug in Flink. Does the issue also
> occur on Flink 1.11?
> Have you set a breakpoint in the HadoopOutputFormat.finalizeGlobal() when
> running locally to validate that this method doesn't get called?
>
> What do you mean by "algorithm version 2"? Where can you set this? (Sorry
> for the question, I'm not an expert with Hadoop's FileOutputCommitter)
>
> Note to others: There's a related discussion here:
> https://issues.apache.org/jira/browse/FLINK-19069
>
> Best,
> Robert
>
>
> On Wed, Aug 26, 2020 at 1:10 AM Ken Krugler <[hidden email]>
> wrote:
>
>> Hi devs,
>>
>> In HadoopOutputFormat.close(), I see code that is trying to rename
>> <outputPath>/tmp-r-00001 to be <outputPath>/1
>>
>> But when I run my Flink 1.9.2 code using a local MiniCluster, the actual
>> location of the tmp-r-00001 file is:
>>
>> <outputPath>/_temporary/0/task__0000_r_000001/tmp-r-00001
>>
>> I think this is because the default behavior of Hadoop’s
>> FileOutputCommitter (with algorithm == 1) is to put files in task-specific
>> sub-dirs.
>>
>> It’s depending on a post-completion “merge paths” action to be taken by
>> what is (for Hadoop) the Application Master.
>>
>> I assume that when running on a real cluster, the
>> HadoopOutputFormat.finalizeGlobal() method’s call to commitJob() would do
>> this, but it doesn’t seem to be happening when I run locally.
>>
>> If I set the algorithm version to 2, then “merge paths” is handled by
>> FileOutputCommitter immediately, and the HadoopOutputFormat code finds
>> files in the expected location.
>>
>> Wondering if Flink should always be using version 2 of the algorithm, as
>> that’s more performant when there are a lot of results (which is why it was
>> added).
>>
>> Thanks,
>>
>> — Ken
>>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> custom big data solutions & training
>> Hadoop, Cascading, Cassandra & Solr
>>
>>

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr