[jira] [Created] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-13874) StreamingFileSink fails to recover (truncate) properly

Shang Yuanchun (Jira)
Gyula Fora created FLINK-13874:
----------------------------------

             Summary: StreamingFileSink fails to recover (truncate) properly
                 Key: FLINK-13874
                 URL: https://issues.apache.org/jira/browse/FLINK-13874
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / FileSystem
    Affects Versions: 1.9.0
            Reporter: Gyula Fora


It seems that there might be some problem with the truncate / recovery logic for the HadoopRecoverableFsDataOutputStream.

I keep hitting the following error:

java.io.IOException: Problem while truncating file: hdfs:/user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.safelyTruncateFile(HadoopRecoverableFsDataOutputStream.java:166) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream.<init>(HadoopRecoverableFsDataOutputStream.java:89) at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.recover(HadoopRecoverableWriter.java:72) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to TRUNCATE_FILE /user/root/flink/filesink/transaction-test1-text/2019-08-27--07/.part-1-1.inprogress.7e882941-ab98-4404-b16b-87a26256bf4d for DFSClient_NONMAPREDUCE_-1189574442_56 on 172.31.114.177 because DFSClient_NONMAPREDUCE_-1189574442_56 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2522) at org.apache.hadoop.hdfs.server.namenode.FSDirTruncateOp.truncate(FSDirTruncateOp.java:119) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:2091) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:1070) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:669) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:991) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:869) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2675)

 

It seems like we don't wait until the file is properly truncated before starting to write again but my analysis might be off.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)