Re0 Beatrice created FLINK-21891:
------------------------------------
Summary: The .staging_xxx directory isn't deleted after writing data to hive table in batch mode
Key: FLINK-21891
URL:
https://issues.apache.org/jira/browse/FLINK-21891 Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.12.0
Reporter: Re0 Beatrice
In flink 1.12.0, use Blink Planner to read data from Hbase and write the results to Hive via Flink SQL.
The .staging_xxx files on HDFS:
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616074732697
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616120408195
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121007337
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616121607484
/user/hive/warehouse/user.db/t_hive_user_group_result/.staging_1616206808142
I found the following code in `org.apache.flink.table.filesystem.FileSystemOutputFormat` caused the problem:
{code:java}
import java.io.File;
@Override
public void finalizeGlobal(int parallelism) {
try {
FileSystemCommitter committer = new FileSystemCommitter(
fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length);
committer.commitUpToCheckpoint(CHECKPOINT_ID);
} catch (Exception e) {
throw new TableException("Exception in finalizeGlobal", e);
} finally {
new File(tmpPath.getPath()).delete(); // the error code
}
}
{code}
The code in finally code block `new File(..)` can't convert `tmpPath` to HDFS file instance, I think the following code is more correct:
{code:java}
fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
{code}
A similar code has appeared in the class of PartitionTempFileManager.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)