Hi,
I am trying to use orcsourcetable to fetch data stored in hive tables on hdfs. I am able to use the orcsourcetable to fetch the data and deserialize on local cluster. But when I am trying to use the hdfs path, it is throwing me file not found error. Any help will be appreciated on the topic. Versions: Flink: 1.7.1 Hive: 2.3.4 *Code snippet:* import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.orc.OrcTableSource; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; final ExecutionEnvironment environment = ExecutionEnvironment .getExecutionEnvironment(); BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(environment); OrcTableSource orcTS = OrcTableSource.builder() .path("hdfs://host:port/logs/sa_structured_events") .forOrcSchema(new OrcSchemaProvider().getStructuredEventsSchema()) .build(); tableEnvironment.registerTableSource("OrcTable", orcTS); Table result = tableEnvironment.sqlQuery("SELECT * FROM OrcTable"); DataSet<Row> rowDataSet = tableEnvironment.toDataSet(result, Row.class); tableEnvironment.execEnv().execute(); *Error:* 2019-10-14 16:56:26,048 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource (OrcFile[path=hdfs://host:port/logs/sa_structured_events, schema=struct<customerid:string,eventid:string,subtype:st) (1/1) (9e1ad40a0f0b80ef0ad8d3b2fc58816d) switched from RUNNING to FAILED. java.io.FileNotFoundException: File /logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) at org.apache.orc.OrcFile.createReader(OrcFile.java:251) at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Unknown Source) 2019-10-14 16:56:26,048 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Java Job at Mon Oct 14 16:56:07 IST 2019 (26a54fbcbd46cd0c4796e7308a2ba3b0) switched from state RUNNING to FAILING. java.io.FileNotFoundException: File /logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) at org.apache.orc.OrcFile.createReader(OrcFile.java:251) at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Unknown Source) Regards, Pritam. |
Maybe you can paste your flink configuration and hdfs-site.xml and check if there are some problems on the hdfs fileSystem related conf. Also you should check whether this path really exists on hdfs with a hdfs shell command(e.g. hdfs dfs -ls /xxx, see https://hadoop.apache.org/docs/r2.7.5/hadoop-project-dist/hadoop-common/FileSystemShell.html)
At 2019-10-15 01:27:39, "Pritam Sadhukhan" <[hidden email]> wrote: >Hi, > >I am trying to use orcsourcetable to fetch data stored in hive tables on >hdfs. >I am able to use the orcsourcetable to fetch the data and deserialize on >local cluster. > >But when I am trying to use the hdfs path, it is throwing me file not found >error. > >Any help will be appreciated on the topic. > >Versions: > >Flink: 1.7.1 >Hive: 2.3.4 > >*Code snippet:* > >import org.apache.flink.api.java.DataSet; >import org.apache.flink.api.java.ExecutionEnvironment; >import org.apache.flink.configuration.Configuration; >import org.apache.flink.core.fs.FileSystem; >import org.apache.flink.orc.OrcTableSource; >import org.apache.flink.table.api.java.BatchTableEnvironment; >import org.apache.flink.table.api.Table; >import org.apache.flink.table.api.TableEnvironment; >import org.apache.flink.types.Row; > >final ExecutionEnvironment environment = ExecutionEnvironment > .getExecutionEnvironment(); >BatchTableEnvironment tableEnvironment = >TableEnvironment.getTableEnvironment(environment); >OrcTableSource orcTS = OrcTableSource.builder() > .path("hdfs://host:port/logs/sa_structured_events") > .forOrcSchema(new >OrcSchemaProvider().getStructuredEventsSchema()) > .build(); > >tableEnvironment.registerTableSource("OrcTable", orcTS); >Table result = tableEnvironment.sqlQuery("SELECT * FROM OrcTable"); > >DataSet<Row> rowDataSet = tableEnvironment.toDataSet(result, Row.class); > >tableEnvironment.execEnv().execute(); > > >*Error:* >2019-10-14 16:56:26,048 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource >(OrcFile[path=hdfs://host:port/logs/sa_structured_events, >schema=struct<customerid:string,eventid:string,subtype:st) (1/1) >(9e1ad40a0f0b80ef0ad8d3b2fc58816d) switched from RUNNING to FAILED. >java.io.FileNotFoundException: File >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >does not exist >at >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >at >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >at >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >at >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >at >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >at >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >at java.lang.Thread.run(Unknown Source) >2019-10-14 16:56:26,048 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink >Java Job at Mon Oct 14 16:56:07 IST 2019 (26a54fbcbd46cd0c4796e7308a2ba3b0) >switched from state RUNNING to FAILING. >java.io.FileNotFoundException: File >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >does not exist >at >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >at >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >at >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >at >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >at >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >at >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >at java.lang.Thread.run(Unknown Source) > > >Regards, >Pritam. |
Thanks for the information.
I am able to see all the files using hdfs shell command. Even I am able to pull the data on flink with environment.readTextFile("hdfs://host:port/qlake/logs/sa_structured_events") The issue is only with orcdatasource implementation. Here is my configuration files. *flink-conf.yaml:* ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ #============================================================================== # Common #============================================================================== # The external address of the host on which the JobManager runs and can be # reached by the TaskManagers and any clients which want to connect. This setting # is only used in Standalone mode and may be overwritten on the JobManager side # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. # In high availability mode, if you use the bin/start-cluster.sh script and setup # the conf/masters file, this will be taken care of automatically. Yarn/Mesos # automatically configure the host name based on the hostname of the node where the # JobManager runs. jobmanager.rpc.address: localhost # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 # The heap size for the JobManager JVM jobmanager.heap.size: 1024m # The heap size for the TaskManager JVM taskmanager.heap.size: 1024m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 8 # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 # The default file system scheme and authority. # # By default file paths without scheme are interpreted relative to the local # root file system 'file:///'. Use this to override the default and interpret # relative paths relative to a different file system, # for example 'hdfs://mynamenode:12345' # # fs.default-scheme #============================================================================== # High Availability #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. # # high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) # # high-availability.storageDir: hdfs:///flink/ha/ # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) # # high-availability.zookeeper.quorum: localhost:2181 # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) # The default value is "open" and it can be changed to "creator" if ZK security is enabled # # high-availability.zookeeper.client.acl: open #============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints # Default target directory for savepoints, optional. # # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false #============================================================================== # Web Frontend #============================================================================== # The address under which the web-based runtime monitor listens. # #web.address: 0.0.0.0 # The port under which the web-based runtime monitor listens. # A value of -1 deactivates the web server. rest.port: 8081 # Flag to specify whether job submission is enabled from the web-based # runtime monitor. Uncomment to disable. #web.submit.enable: false #============================================================================== # Advanced #============================================================================== # Override the directories for temporary files. If not specified, the # system-specific Java temporary directory (java.io.tmpdir property) is taken. # # For framework setups on Yarn or Mesos, Flink will automatically pick up the # containers' temp directories without any need for configuration. # # Add a delimited list for multiple directories, using the system directory # delimiter (colon ':' on unix) or a comma, e.g.: # /data1/tmp:/data2/tmp:/data3/tmp # # Note: Each directory entry is read from and written to by a different I/O # thread. You can include the same directory multiple times in order to create # multiple I/O threads against that directory. This is for example relevant for # high-throughput RAIDs. # # io.tmp.dirs: /tmp # Specify whether TaskManager's managed memory should be allocated when starting # up (true) or when memory is requested. # # We recommend to set this value to 'true' only in setups for pure batch # processing (DataSet API). Streaming setups currently do not use the TaskManager's # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management, # while the 'memory' and 'filesystem' backends explicitly keep data as objects # to save on serialization cost. # # taskmanager.memory.preallocate: false # The classloading resolve order. Possible values are 'child-first' (Flink's default) # and 'parent-first' (Java's default). # # Child first classloading allows users to use different dependency/library # versions in their application than those in the classpath. Switching back # to 'parent-first' may help with debugging dependency issues. # # classloader.resolve-order: child-first # The amount of memory going to the network stack. These numbers usually need # no tuning. Adjusting them may be necessary in case of an "Insufficient number # of network buffers" error. The default min is 64MB, teh default max is 1GB. # # taskmanager.network.memory.fraction: 0.1 # taskmanager.network.memory.min: 64mb # taskmanager.network.memory.max: 1gb #============================================================================== # Flink Cluster Security Configuration #============================================================================== # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - # may be enabled in four steps: # 1. configure the local krb5.conf file # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) # 3. make the credentials available to various JAAS login contexts # 4. configure the connector to use JAAS/SASL # The below configure how Kerberos credentials are provided. A keytab will be used instead of # a ticket cache if the keytab path and principal are set. # security.kerberos.login.use-ticket-cache: true # security.kerberos.login.keytab: /path/to/kerberos/keytab # security.kerberos.login.principal: flink-user # The configuration below defines which JAAS login contexts # security.kerberos.login.contexts: Client,KafkaClient #============================================================================== # ZK Security Configuration #============================================================================== # Below configurations are applicable if ZK ensemble is configured for security # Override below configuration to provide custom ZK service name if configured # zookeeper.sasl.service-name: zookeeper # The configuration below must match one of the values set in "security.kerberos.login.contexts" # zookeeper.sasl.login-context-name: Client #============================================================================== # HistoryServer #============================================================================== # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) # Directory to upload completed jobs to. Add this directory to the list of # monitored directories of the HistoryServer as well (see below). #jobmanager.archive.fs.dir: hdfs:///completed-jobs/ # The address under which the web-based HistoryServer listens. #historyserver.web.address: 0.0.0.0 # The port under which the web-based HistoryServer listens. #historyserver.web.port: 8082 # Comma separated list of directories to monitor for completed jobs. #historyserver.archive.fs.dir: hdfs:///completed-jobs/ # Interval in milliseconds for refreshing the monitored directories. #historyserver.archive.fs.refresh-interval: 10000 akka.ask.timeout: 1000 s akka.client.timeout: 1000 s akka.lookup.timeout: 1000 s web.timeout: 1000000 taskmanager.debug.memory.log: true *hdfs-site.xml:* <?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> <!-- Put site-specific property overrides in this file. --> <configuration> </configuration> On Tue, 15 Oct 2019 at 08:38, 刘芃成 <[hidden email]> wrote: > Maybe you can paste your flink configuration and hdfs-site.xml and check > if there are some problems on the hdfs fileSystem related conf. Also you > should check whether this path really exists on hdfs with a hdfs shell > command(e.g. hdfs dfs -ls /xxx, see > https://hadoop.apache.org/docs/r2.7.5/hadoop-project-dist/hadoop-common/FileSystemShell.html > ) > At 2019-10-15 01:27:39, "Pritam Sadhukhan" <[hidden email]> > wrote: > >Hi, > > > >I am trying to use orcsourcetable to fetch data stored in hive tables on > >hdfs. > >I am able to use the orcsourcetable to fetch the data and deserialize on > >local cluster. > > > >But when I am trying to use the hdfs path, it is throwing me file not > found > >error. > > > >Any help will be appreciated on the topic. > > > >Versions: > > > >Flink: 1.7.1 > >Hive: 2.3.4 > > > >*Code snippet:* > > > >import org.apache.flink.api.java.DataSet; > >import org.apache.flink.api.java.ExecutionEnvironment; > >import org.apache.flink.configuration.Configuration; > >import org.apache.flink.core.fs.FileSystem; > >import org.apache.flink.orc.OrcTableSource; > >import org.apache.flink.table.api.java.BatchTableEnvironment; > >import org.apache.flink.table.api.Table; > >import org.apache.flink.table.api.TableEnvironment; > >import org.apache.flink.types.Row; > > > >final ExecutionEnvironment environment = ExecutionEnvironment > > .getExecutionEnvironment(); > >BatchTableEnvironment tableEnvironment = > >TableEnvironment.getTableEnvironment(environment); > >OrcTableSource orcTS = OrcTableSource.builder() > > .path("hdfs://host:port/logs/sa_structured_events") > > .forOrcSchema(new > >OrcSchemaProvider().getStructuredEventsSchema()) > > .build(); > > > >tableEnvironment.registerTableSource("OrcTable", orcTS); > >Table result = tableEnvironment.sqlQuery("SELECT * FROM OrcTable"); > > > >DataSet<Row> rowDataSet = tableEnvironment.toDataSet(result, Row.class); > > > >tableEnvironment.execEnv().execute(); > > > > > >*Error:* > >2019-10-14 16:56:26,048 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - > DataSource > >(OrcFile[path=hdfs://host:port/logs/sa_structured_events, > >schema=struct<customerid:string,eventid:string,subtype:st) (1/1) > >(9e1ad40a0f0b80ef0ad8d3b2fc58816d) switched from RUNNING to FAILED. > >java.io.FileNotFoundException: File > > >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc > >does not exist > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) > >at > > >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) > >at > > >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) > >at > org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) > >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) > >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) > >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) > >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) > >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) > >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) > >at > > >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > >at java.lang.Thread.run(Unknown Source) > >2019-10-14 16:56:26,048 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink > >Java Job at Mon Oct 14 16:56:07 IST 2019 > (26a54fbcbd46cd0c4796e7308a2ba3b0) > >switched from state RUNNING to FAILING. > >java.io.FileNotFoundException: File > > >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc > >does not exist > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) > >at > > >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) > >at > > >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) > >at > > >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) > >at > org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) > >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) > >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) > >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) > >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) > >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) > >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) > >at > > >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > >at java.lang.Thread.run(Unknown Source) > > > > > >Regards, > >Pritam. > |
Can anyone please help me with the conf files?
Am I missing anything on the configuration part? Regards, Pritam. On Tue, 15 Oct 2019 at 08:48, Pritam Sadhukhan <[hidden email]> wrote: > Thanks for the information. > > I am able to see all the files using hdfs shell command. > Even I am able to pull the data on flink with > > environment.readTextFile("hdfs://host:port/qlake/logs/sa_structured_events") > > The issue is only with orcdatasource implementation. > Here is my configuration files. > > *flink-conf.yaml:* > > ################################################################################ > # Licensed to the Apache Software Foundation (ASF) under one > # or more contributor license agreements. See the NOTICE file > # distributed with this work for additional information > # regarding copyright ownership. The ASF licenses this file > # to you under the Apache License, Version 2.0 (the > # "License"); you may not use this file except in compliance > # with the License. You may obtain a copy of the License at > # > # http://www.apache.org/licenses/LICENSE-2.0 > # > # Unless required by applicable law or agreed to in writing, software > # distributed under the License is distributed on an "AS IS" BASIS, > # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > # See the License for the specific language governing permissions and > # limitations under the License. > > ################################################################################ > > > > #============================================================================== > # Common > > #============================================================================== > > # The external address of the host on which the JobManager runs and can be > > # reached by the TaskManagers and any clients which want to connect. This setting > > # is only used in Standalone mode and may be overwritten on the JobManager side > > # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable. > > # In high availability mode, if you use the bin/start-cluster.sh script and setup > > # the conf/masters file, this will be taken care of automatically. Yarn/Mesos > > # automatically configure the host name based on the hostname of the node where the > # JobManager runs. > > jobmanager.rpc.address: localhost > > # The RPC port where the JobManager is reachable. > > jobmanager.rpc.port: 6123 > > > # The heap size for the JobManager JVM > > jobmanager.heap.size: 1024m > > > # The heap size for the TaskManager JVM > > taskmanager.heap.size: 1024m > > > > # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. > > taskmanager.numberOfTaskSlots: 8 > > > # The parallelism used for programs that did not specify and other parallelism. > > parallelism.default: 1 > > # The default file system scheme and authority. > # > > # By default file paths without scheme are interpreted relative to the local > > # root file system 'file:///'. Use this to override the default and interpret > # relative paths relative to a different file system, > # for example 'hdfs://mynamenode:12345' > # > # fs.default-scheme > > > #============================================================================== > # High Availability > > #============================================================================== > > # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. > # > # high-availability: zookeeper > > > # The path where metadata for master recovery is persisted. While ZooKeeper stores > > # the small ground truth for checkpoint and leader election, this location stores > # the larger objects, like persisted dataflow graphs. > # > # Must be a durable file system that is accessible from all nodes > # (like HDFS, S3, Ceph, nfs, ...) > # > # high-availability.storageDir: hdfs:///flink/ha/ > > # The list of ZooKeeper quorum peers that coordinate the high-availability > # setup. This must be a list of the form: > # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) > # > # high-availability.zookeeper.quorum: localhost:2181 > > > # ACL options are based on > https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes > > # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) > > # The default value is "open" and it can be changed to "creator" if ZK security is enabled > # > # high-availability.zookeeper.client.acl: open > > > #============================================================================== > # Fault tolerance and checkpointing > > #============================================================================== > > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > # > # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the > # <class-name-of-factory>. > # > # state.backend: filesystem > > > # Directory for checkpoints filesystem, when using any of the default bundled > # state backends. > # > # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints > > # Default target directory for savepoints, optional. > # > # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints > > # Flag to enable/disable incremental checkpoints for backends that > # support incremental checkpoints (like the RocksDB state backend). > # > # state.backend.incremental: false > > > #============================================================================== > # Web Frontend > > #============================================================================== > > # The address under which the web-based runtime monitor listens. > # > #web.address: 0.0.0.0 > > # The port under which the web-based runtime monitor listens. > # A value of -1 deactivates the web server. > > rest.port: 8081 > > # Flag to specify whether job submission is enabled from the web-based > # runtime monitor. Uncomment to disable. > > #web.submit.enable: false > > > #============================================================================== > # Advanced > > #============================================================================== > > # Override the directories for temporary files. If not specified, the > > # system-specific Java temporary directory (java.io.tmpdir property) is taken. > # > > # For framework setups on Yarn or Mesos, Flink will automatically pick up the > # containers' temp directories without any need for configuration. > # > # Add a delimited list for multiple directories, using the system directory > # delimiter (colon ':' on unix) or a comma, e.g.: > # /data1/tmp:/data2/tmp:/data3/tmp > # > # Note: Each directory entry is read from and written to by a different I/O > > # thread. You can include the same directory multiple times in order to create > > # multiple I/O threads against that directory. This is for example relevant for > # high-throughput RAIDs. > # > # io.tmp.dirs: /tmp > > > # Specify whether TaskManager's managed memory should be allocated when starting > # up (true) or when memory is requested. > # > # We recommend to set this value to 'true' only in setups for pure batch > > # processing (DataSet API). Streaming setups currently do not use the TaskManager's > > # managed memory: The 'rocksdb' state backend uses RocksDB's own memory management, > > # while the 'memory' and 'filesystem' backends explicitly keep data as objects > # to save on serialization cost. > # > # taskmanager.memory.preallocate: false > > > # The classloading resolve order. Possible values are 'child-first' (Flink's default) > # and 'parent-first' (Java's default). > # > # Child first classloading allows users to use different dependency/library > # versions in their application than those in the classpath. Switching back > # to 'parent-first' may help with debugging dependency issues. > # > # classloader.resolve-order: child-first > > > # The amount of memory going to the network stack. These numbers usually need > > # no tuning. Adjusting them may be necessary in case of an "Insufficient number > > # of network buffers" error. The default min is 64MB, teh default max is 1GB. > # > # taskmanager.network.memory.fraction: 0.1 > # taskmanager.network.memory.min: 64mb > # taskmanager.network.memory.max: 1gb > > > #============================================================================== > # Flink Cluster Security Configuration > > #============================================================================== > > > # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - > # may be enabled in four steps: > # 1. configure the local krb5.conf file > > # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) > # 3. make the credentials available to various JAAS login contexts > # 4. configure the connector to use JAAS/SASL > > > # The below configure how Kerberos credentials are provided. A keytab will be used instead of > # a ticket cache if the keytab path and principal are set. > > # security.kerberos.login.use-ticket-cache: true > # security.kerberos.login.keytab: /path/to/kerberos/keytab > # security.kerberos.login.principal: flink-user > > # The configuration below defines which JAAS login contexts > > # security.kerberos.login.contexts: Client,KafkaClient > > > #============================================================================== > # ZK Security Configuration > > #============================================================================== > > > # Below configurations are applicable if ZK ensemble is configured for security > > > # Override below configuration to provide custom ZK service name if configured > # zookeeper.sasl.service-name: zookeeper > > > # The configuration below must match one of the values set in "security.kerberos.login.contexts" > # zookeeper.sasl.login-context-name: Client > > > #============================================================================== > # HistoryServer > > #============================================================================== > > > # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) > > # Directory to upload completed jobs to. Add this directory to the list of > # monitored directories of the HistoryServer as well (see below). > #jobmanager.archive.fs.dir: hdfs:///completed-jobs/ > > # The address under which the web-based HistoryServer listens. > #historyserver.web.address: 0.0.0.0 > > # The port under which the web-based HistoryServer listens. > #historyserver.web.port: 8082 > > # Comma separated list of directories to monitor for completed jobs. > #historyserver.archive.fs.dir: hdfs:///completed-jobs/ > > # Interval in milliseconds for refreshing the monitored directories. > #historyserver.archive.fs.refresh-interval: 10000 > > akka.ask.timeout: 1000 s > akka.client.timeout: 1000 s > akka.lookup.timeout: 1000 s > > web.timeout: 1000000 > taskmanager.debug.memory.log: true > > *hdfs-site.xml:* > <?xml version="1.0" encoding="UTF-8"?> > <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> > <!-- > Licensed under the Apache License, Version 2.0 (the "License"); > you may not use this file except in compliance with the License. > You may obtain a copy of the License at > > http://www.apache.org/licenses/LICENSE-2.0 > > Unless required by applicable law or agreed to in writing, software > distributed under the License is distributed on an "AS IS" BASIS, > WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > See the License for the specific language governing permissions and > limitations under the License. See accompanying LICENSE file. > --> > > <!-- Put site-specific property overrides in this file. --> > > <configuration> > > </configuration> > > > > > > On Tue, 15 Oct 2019 at 08:38, 刘芃成 <[hidden email]> wrote: > >> Maybe you can paste your flink configuration and hdfs-site.xml and check >> if there are some problems on the hdfs fileSystem related conf. Also you >> should check whether this path really exists on hdfs with a hdfs shell >> command(e.g. hdfs dfs -ls /xxx, see >> https://hadoop.apache.org/docs/r2.7.5/hadoop-project-dist/hadoop-common/FileSystemShell.html >> ) >> At 2019-10-15 01:27:39, "Pritam Sadhukhan" <[hidden email]> >> wrote: >> >Hi, >> > >> >I am trying to use orcsourcetable to fetch data stored in hive tables on >> >hdfs. >> >I am able to use the orcsourcetable to fetch the data and deserialize on >> >local cluster. >> > >> >But when I am trying to use the hdfs path, it is throwing me file not >> found >> >error. >> > >> >Any help will be appreciated on the topic. >> > >> >Versions: >> > >> >Flink: 1.7.1 >> >Hive: 2.3.4 >> > >> >*Code snippet:* >> > >> >import org.apache.flink.api.java.DataSet; >> >import org.apache.flink.api.java.ExecutionEnvironment; >> >import org.apache.flink.configuration.Configuration; >> >import org.apache.flink.core.fs.FileSystem; >> >import org.apache.flink.orc.OrcTableSource; >> >import org.apache.flink.table.api.java.BatchTableEnvironment; >> >import org.apache.flink.table.api.Table; >> >import org.apache.flink.table.api.TableEnvironment; >> >import org.apache.flink.types.Row; >> > >> >final ExecutionEnvironment environment = ExecutionEnvironment >> > .getExecutionEnvironment(); >> >BatchTableEnvironment tableEnvironment = >> >TableEnvironment.getTableEnvironment(environment); >> >OrcTableSource orcTS = OrcTableSource.builder() >> > .path("hdfs://host:port/logs/sa_structured_events") >> > .forOrcSchema(new >> >OrcSchemaProvider().getStructuredEventsSchema()) >> > .build(); >> > >> >tableEnvironment.registerTableSource("OrcTable", orcTS); >> >Table result = tableEnvironment.sqlQuery("SELECT * FROM OrcTable"); >> > >> >DataSet<Row> rowDataSet = tableEnvironment.toDataSet(result, Row.class); >> > >> >tableEnvironment.execEnv().execute(); >> > >> > >> >*Error:* >> >2019-10-14 16:56:26,048 INFO >> > org.apache.flink.runtime.executiongraph.ExecutionGraph - >> DataSource >> >(OrcFile[path=hdfs://host:port/logs/sa_structured_events, >> >schema=struct<customerid:string,eventid:string,subtype:st) (1/1) >> >(9e1ad40a0f0b80ef0ad8d3b2fc58816d) switched from RUNNING to FAILED. >> >java.io.FileNotFoundException: File >> >> >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >> >does not exist >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >> >at >> >> >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >> >at >> >> >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >> >at >> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >> >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >> >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >> >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >> >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >> >at >> org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >> >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >> >at >> >> >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> >at java.lang.Thread.run(Unknown Source) >> >2019-10-14 16:56:26,048 INFO >> > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> Flink >> >Java Job at Mon Oct 14 16:56:07 IST 2019 >> (26a54fbcbd46cd0c4796e7308a2ba3b0) >> >switched from state RUNNING to FAILING. >> >java.io.FileNotFoundException: File >> >> >/logs/sa_structured_events/part-00000-b2562d39-1097-490c-99dd-672ed12bbb10-c000.snappy.orc >> >does not exist >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:635) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:861) >> >at >> >> >org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:625) >> >at >> >> >org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:442) >> >at >> >> >org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:146) >> >at >> org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:347) >> >at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) >> >at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:517) >> >at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:364) >> >at org.apache.orc.OrcFile.createReader(OrcFile.java:251) >> >at >> org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:225) >> >at org.apache.flink.orc.OrcRowInputFormat.open(OrcRowInputFormat.java:63) >> >at >> >> >org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) >> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> >at java.lang.Thread.run(Unknown Source) >> > >> > >> >Regards, >> >Pritam. >> > |
Free forum by Nabble | Edit this page |