An exception about checkpoint

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

An exception about checkpoint

Leor Li
Hey,

Recently I used flink1.6.3 to run a task and the following exception
occurred. Can someone tell me how to solve this problem?

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
10631 for operator Source: Custom Source -> Flat Map -> (Map ->
Timestamps/Watermarks -> from: (dtEventTime, bkdata_par_offset, N_Route,
iEventId, vVersion, vFromIp, vLocalIp, iRequestType, vAppid, vOpenid,
vCmdid, vL5_ip_port, iPermission, iResult, dProcessTime, vErrorMsg, vUrl,
vDeviceInfo, rowtime) -> where: (OR(=(iResult, -308), =(iResult, -309))),
select: (rowtime, vErrorMsg) -> time attribute: (rowtime), Map) (8/20).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 10631 for
operator Source: Custom Source -> Flat Map -> (Map -> Timestamps/Watermarks
-> from: (dtEventTime, bkdata_par_offset, N_Route, iEventId, vVersion,
vFromIp, vLocalIp, iRequestType, vAppid, vOpenid, vCmdid, vL5_ip_port,
iPermission, iResult, dProcessTime, vErrorMsg, vUrl, vDeviceInfo, rowtime)
-> where: (OR(=(iResult, -308), =(iResult, -309))), select: (rowtime,
vErrorMsg) -> time attribute: (rowtime), Map) (8/20).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on
/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
(inode 7450914863): File does not exist. Holder
DFSClient_NONMAPREDUCE_-1995220833_1 does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3690)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3660)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:729)
at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:243)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:527)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)

at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy9.complete(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:443)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.complete(Unknown Source)
at
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2283)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2267)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
... 12 more
Reply | Threaded
Open this post in threaded view
|

Re: An exception about checkpoint

Congxian Qiu
Hi, Leor

Seems like the checkpoint file
`/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc`
did not exist for some reason, you can check the life cycle of this file
from hdfs audit log and find out why the file did not exist. maybe the
checkpoint directory has been removed because the checkpoint 10631 has been
subsumed[1][2].

[1] https://issues.apache.org/jira/browse/FLINK-10930
[2] https://issues.apache.org/jira/browse/FLINK-10724

Leor Li <[hidden email]> 于2019年1月13日周日 下午6:59写道:

> Hey,
>
> Recently I used flink1.6.3 to run a task and the following exception
> occurred. Can someone tell me how to solve this problem?
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 10631 for operator Source: Custom Source -> Flat Map -> (Map ->
> Timestamps/Watermarks -> from: (dtEventTime, bkdata_par_offset, N_Route,
> iEventId, vVersion, vFromIp, vLocalIp, iRequestType, vAppid, vOpenid,
> vCmdid, vL5_ip_port, iPermission, iResult, dProcessTime, vErrorMsg, vUrl,
> vDeviceInfo, rowtime) -> where: (OR(=(iResult, -308), =(iResult, -309))),
> select: (rowtime, vErrorMsg) -> time attribute: (rowtime), Map) (8/20).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 10631 for
> operator Source: Custom Source -> Flat Map -> (Map -> Timestamps/Watermarks
> -> from: (dtEventTime, bkdata_par_offset, N_Route, iEventId, vVersion,
> vFromIp, vLocalIp, iRequestType, vAppid, vOpenid, vCmdid, vL5_ip_port,
> iPermission, iResult, dProcessTime, vErrorMsg, vUrl, vDeviceInfo, rowtime)
> -> where: (OR(=(iResult, -308), =(iResult, -309))), select: (rowtime,
> vErrorMsg) -> time attribute: (rowtime), Map) (8/20).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> hdfs:/app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:454)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:359)
> at
> org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
> /app/flink/checkpoints/918f49d9eb23bdea016e865182718a06/chk-10631/6e86cce6-9ac9-4856-b0a3-42c81fcafadc
> (inode 7450914863): File does not exist. Holder
> DFSClient_NONMAPREDUCE_-1995220833_1 does not have any open files.
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3690)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3660)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:729)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.complete(AuthorizationProviderProxyClientProtocol.java:243)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:527)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1468)
> at org.apache.hadoop.ipc.Client.call(Client.java:1399)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> at com.sun.proxy.$Proxy9.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:443)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2283)
> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2267)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:314)
> ... 12 more
>


--
Best,
Congxian