Flink 1.10 with GCS for checkpoints

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.10 with GCS for checkpoints

Ramya Ramamurthy
Hi,

We are trying to upgrade our Flink from 1.7 to 1.10. We have our
checkpoints on Google Cloud Storage today. But this is not working well
with 1.10.
And below is the error we get.
any help here would be appreciated.
We followed the below blog for GCS related configurations.
https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine


Excerpt from the error:





*org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'gs'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.     at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)     at
org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)     at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
 *

*Complete ERROR Stack:*

2020-06-15 04:46:00,783 WARN  org.apache.flink.configuration.Configuration
                 - Config uses deprecated configuration key
'high-availability.zookeeper.storageDir' instead of proper key
'high-availability.storageDir'
2020-06-15 04:46:00,804 INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting
StandaloneSessionClusterEntrypoint down with application status FAILED.
Diagnostics java.io.IOException: Could not create FileSystem for highly
available storage path
(gs://ss-enigma-bucket/flink/flink/checkpoints/fs.default_ns)
    at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
    at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
    at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
    at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
    at
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'gs'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
    ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.
    at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
    ... 13 more
.
2020-06-15 04:46:00,816 INFO
 org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping
Akka RPC service.
2020-06-15 04:46:00,901 INFO
 akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
down remote daemon.
2020-06-15 04:46:00,903 INFO
 akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
daemon shut down; proceeding with flushing remote transports.
2020-06-15 04:46:00,948 INFO
 akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
shut down.
2020-06-15 04:46:01,006 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not
start cluster entrypoint StandaloneSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
    at
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
Caused by: java.io.IOException: Could not create FileSystem for highly
available storage path
(gs://ss-enigma-bucket/flink/flink/checkpoints/fs.default_ns)
    at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
    at
org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
    at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
    at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
    ... 2 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 'gs'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
    at
org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
    ... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.
    at
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
    at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
    ... 13 more
2020-06-15 04:46:01,009 INFO
 org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped
Akka RPC service.

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.10 with GCS for checkpoints

Till Rohrmann
Hi Ramya,

it looks as if Flink cannot find the Hadoop dependencies. Could you make
sure that you start Flink with HADOOP_CLASSPATH defined or pointing it to
the Hadoop conf directory via HADOOP_CONF_DIR. See this link [1] for more
information on how to add Hadoop support.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html

Cheers,
Till

On Mon, Jun 15, 2020 at 10:44 AM Ramya Ramamurthy <[hidden email]> wrote:

> Hi,
>
> We are trying to upgrade our Flink from 1.7 to 1.10. We have our
> checkpoints on Google Cloud Storage today. But this is not working well
> with 1.10.
> And below is the error we get.
> any help here would be appreciated.
> We followed the below blog for GCS related configurations.
>
> https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine
>
>
> Excerpt from the error:
>
>
>
>
>
>
> *org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)     at
> org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)     at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>  *
>
> *Complete ERROR Stack:*
>
> 2020-06-15 04:46:00,783 WARN  org.apache.flink.configuration.Configuration
>                  - Config uses deprecated configuration key
> 'high-availability.zookeeper.storageDir' instead of proper key
> 'high-availability.storageDir'
> 2020-06-15 04:46:00,804 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting
> StandaloneSessionClusterEntrypoint down with application status FAILED.
> Diagnostics java.io.IOException: Could not create FileSystem for highly
> available storage path
> (gs://ss-enigma-bucket/flink/flink/checkpoints/fs.default_ns)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>     at
>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>     at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>     at
>
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>     at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>     ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
>     at
>
> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
>     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>     ... 13 more
> .
> 2020-06-15 04:46:00,816 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping
> Akka RPC service.
> 2020-06-15 04:46:00,901 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
> down remote daemon.
> 2020-06-15 04:46:00,903 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2020-06-15 04:46:00,948 INFO
>  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.
> 2020-06-15 04:46:01,006 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not
> start cluster entrypoint StandaloneSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint StandaloneSessionClusterEntrypoint.
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:187)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>     at
>
> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
> Caused by: java.io.IOException: Could not create FileSystem for highly
> available storage path
> (gs://ss-enigma-bucket/flink/flink/checkpoints/fs.default_ns)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:103)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
>     at
>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:125)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:305)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:263)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>     at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>     at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>     ... 2 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>     at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>     at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
>     ... 10 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
>     at
>
> org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
>     at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
>     ... 13 more
> 2020-06-15 04:46:01,009 INFO
>  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped
> Akka RPC service.
>
> Thanks.
>