Re: [Question] How to use different filesystem between checkpoint data and user data sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [Question] How to use different filesystem between checkpoint data and user data sink

vino yang
Hi ouywl,

*>>    Thread.currentThread().getContextClassLoader();*

What does this statement mean in your program?

In addition, can you share your implementation of the customized file
system plugin and the related exception?

Best,
Vino

ouywl <[hidden email]> 于2019年12月18日周三 下午4:59写道:

> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and
> the yarn for flink running is used hdfs2. So when the job running, the
> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
> plugin  is conflict with flink component.
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
> kerberos auth in ”FileSystemFactoryEnhance"
>       3. Add a service entry. Create a file
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
> contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>    “ *public static void main(String[] args) throws Exception{*
>
> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>
>
>
>
>
>
>
>
> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers", SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit", "true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");    props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());    props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);    source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);    env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"*
>
>
> ouywl
> [hidden email]
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [Question] How to use different filesystem between checkpoint data and user data sink

Yang Wang
You could have a try the new plugin mechanism.
Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then
put your filesystem related jars in it.
Different plugins will be loaded by separate classloader to avoid conflict.


Best,
Yang

vino yang <[hidden email]> 于2019年12月18日周三 下午6:46写道:

> Hi ouywl,
>
> *>>    Thread.currentThread().getContextClassLoader();*
>
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file
> system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl <[hidden email]> 于2019年12月18日周三 下午4:59写道:
>
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and
>> the yarn for flink running is used hdfs2. So when the job running, the
>> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
>> plugin  is conflict with flink component.
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
>> kerberos auth in ”FileSystemFactoryEnhance"
>>       3. Add a service entry. Create a file
>> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
>> contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>    “ *public static void main(String[] args) throws Exception{*
>>
>> *    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();*
>>
>>
>>
>>
>>
>>
>>
>>
>> *    env.enableCheckpointing(60*1000);    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);    env.getConfig().enableSysoutLogging();    Properties props = new Properties();    props.put("bootstrap.servers", SERVERS);    props.put("group.id <http://group.id>", GROUPID);    props.put("enable.auto.commit", "true");    // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", "30000");    props.put("auto.offset.reset", "latest");    props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());    props.put("value.deserializer", StringDeserializer.class.getName());    FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);    DataStream<String> source = env.addSource(consumer011).setParallelism(1);    source.print();    Thread.currentThread().getContextClassLoader();    StreamingFileSink sink = StreamingFileSink            .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))            .build();    source.addSink(sink);    env.execute();}”And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"*
>>
>>
>> ouywl
>> [hidden email]
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [Question] How to use different filesystem between checkpoint data and user data sink

Piotr Nowojski-3
Hi,

As Yang Wang pointed out, you should use the new plugins mechanism.

If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.

If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>

> On 18 Dec 2019, at 12:40, Yang Wang <[hidden email]> wrote:
>
> You could have a try the new plugin mechanism.
> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
> Different plugins will be loaded by separate classloader to avoid conflict.
>
>
> Best,
> Yang
>
> vino yang <[hidden email] <mailto:[hidden email]>> 于2019年12月18日周三 下午6:46写道:
> Hi ouywl,
>
> >>    Thread.currentThread().getContextClassLoader();
> What does this statement mean in your program?
>
> In addition, can you share your implementation of the customized file system plugin and the related exception?
>
> Best,
> Vino
>
> ouywl <[hidden email] <mailto:[hidden email]>> 于2019年12月18日周三 下午4:59写道:
> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"
>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>    “ public static void main(String[] args) throws Exception{
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(60*1000);
>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();
>
>
>     Properties props = new Properties();
>     props.put("bootstrap.servers", SERVERS);
>     props.put("group.id <http://group.id/>", GROUPID);
>     props.put("enable.auto.commit", "true");
>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>     props.put("auto.offset.reset", "latest");
>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());
>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>
>     source.print();
>     Thread.currentThread().getContextClassLoader();
>
>     StreamingFileSink sink = StreamingFileSink
>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))
>             .build();
>
>     source.addSink(sink);
>
>     env.execute();
> }”
>
> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
>
> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
>
>
>
> ouywl
> [hidden email]
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>

Reply | Threaded
Open this post in threaded view
|

Re: [Question] How to use different filesystem between checkpointdata and user data sink

Piotr Nowojski-3
In reply to this post by Yang Wang
Hi,

Can you share the full stack trace or just attach job manager and task managers logs? This exception should have had some cause logged below.

Piotrek

> On 19 Dec 2019, at 04:06, ouywl <[hidden email]> wrote:
>
> Hi Piotr Nowojski,
>    I have move my filesystem plugin to FLINK_HOME/pulgins in flink 1.9.1. The jobmanage don’t start up ,and It load the filesystem plugin in my owner plugin jar . and the log is :
>   “2019-12-19 10:58:32,394 WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'high-availability.zookeeper.storageDir' instead of proper key 'high-availability.storageDir'
> 2019-12-19 10:58:32,398 INFO  com.filesystem.plugin.FileSystemFactoryEnhance                -  trying to get hadoopEnv, hadoopPath = /conf/hadoop_conf
> 2019-12-19 10:58:32,434 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.datanode.failed.volumes.tolerated;  Ignoring.
> 2019-12-19 10:58:32,436 WARN  org.apache.hadoop.conf.Configuration                          - /tmp/mammut-hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
> 2019-12-19 10:58:32,878 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir)
> at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:119)
> at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
> at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:120)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:292)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:257)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:202)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:164)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:163)
> at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:501)
>  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)"
>
>
> ouywl
> [hidden email]
>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
> On 12/19/2019 00:01,Piotr Nowojski<[hidden email]> <mailto:[hidden email]> wrote:
> Hi,
>
> As Yang Wang pointed out, you should use the new plugins mechanism.
>
> If it doesn’t work, first make sure that you are shipping/distributing the plugins jars correctly - the correct plugins directory structure both on the client machine. Next make sure that the cluster has the same correct setup. This is especially true for the standalone/cluster execution modes. For yarn, mesos, docker the plugins dir should be shipped to the cluster by Flink itself, however Plugins support in yarn is currently semi broken [1]. This is already fixed, but waiting to be released in 1.9.2 and 1.10.
>
> If it still doesn’t work, look for TaskManager logs what plugins/file systems are being loaded during the startup. If none, that's the problem.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-14382 <https://issues.apache.org/jira/browse/FLINK-14382>
>
>> On 18 Dec 2019, at 12:40, Yang Wang <[hidden email] <mailto:[hidden email]>> wrote:
>>
>> You could have a try the new plugin mechanism.
>> Create a new directory named "myhdfs" under $FLINK_HOME/plugins, and then put your filesystem related jars in it.
>> Different plugins will be loaded by separate classloader to avoid conflict.
>>
>>
>> Best,
>> Yang
>>
>> vino yang <[hidden email] <mailto:[hidden email]>> 于2019年12月18日周三 下午6:46写道:
>> Hi ouywl,
>>
>> >>    Thread.currentThread().getContextClassLoader();
>> What does this statement mean in your program?
>>
>> In addition, can you share your implementation of the customized file system plugin and the related exception?
>>
>> Best,
>> Vino
>>
>> ouywl <[hidden email] <mailto:[hidden email]>> 于2019年12月18日周三 下午4:59写道:
>> Hi all,
>>     We have implemented a filesystem plugin for sink data to hdfs1, and the yarn for flink running is used hdfs2. So when the job running, the jobmanager use the conf of hdfs1 to create filesystem, the filesystem plugin  is conflict with flink component.
>>     We implemeted step:
>>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add kerberos auth in ”FileSystemFactoryEnhance"
>>       3. Add a service entry. Create a file META-INF/services/org.apache.flink.core.fs.FileSystemFactory which contains the class name of “ FileSystemFactoryEnhance.class”
>>
>> And  the job mainclass is :
>>    “ public static void main(String[] args) throws Exception{
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.enableCheckpointing(60*1000);
>>     env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>     env.getConfig().enableSysoutLogging();
>>
>>
>>     Properties props = new Properties();
>>     props.put("bootstrap.servers", SERVERS);
>>     props.put("group.id <http://group.id/>", GROUPID);
>>     props.put("enable.auto.commit", "true");
>>     // props.put("auto.commit.interval.ms <http://auto.commit.interval.ms/>", "1000");
>>     props.put("session.timeout.ms <http://session.timeout.ms/>", "30000");
>>     props.put("auto.offset.reset", "latest");
>>     props.put("key.deserializer", org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>>     props.put("value.deserializer", StringDeserializer.class.getName());
>>     FlinkKafkaConsumer010 consumer011 = new FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props);
>>     DataStream<String> source = env.addSource(consumer011).setParallelism(1);
>>
>>     source.print();
>>     Thread.currentThread().getContextClassLoader();
>>
>>     StreamingFileSink sink = StreamingFileSink
>>             .forRowFormat(new Path("hdfs://bdms-test/user/sloth/zyf <hdfs://bdms-test/user/sloth/zyf>"), new SimpleStringEncoder<>("UTF-8"))
>>             .build();
>>
>>     source.addSink(sink);
>>
>>     env.execute();
>> }”
>>
>> And start the job, the jobmanager filesystem is error, the log means the jobmananger use “FileSystemFactoryEnhance” filesystem and confict.
>>
>> As the url https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems> how to avoid use “Thread.currentThread().getContextClassLoader()"
>>
>>
>> ouywl
>> [hidden email]
>>  <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>
>