Hi Everyone,
Hope everything is well. We are using Flink's Table API to read data from Kafka and write to Google Cloud Storage in Parquet File format. The flink version we are using is 1.11.2. The checkpointing interval we specified was 3 minutes. The issue we are facing is, though we generated around 7.5 million records to Kafka, the parquet file size created in Google Cloud storage was in terms of KB (378 kb) and the number of records were much smaller (1200 records per file) than what was generated. We set the logging property to DEBUG and re-ran our flink job in the same cluster. Following were the exceptions we found in the flink' task manager log. We see the below two HTTP 404 exceptions thrown by GCS-HADOOP Connector for record written by Flink. We want to know, if the community has faced issues when you guys have tried streaming data through Flink to Google Cloud Storage from Kafka. If yes, what were those issues and how was it resolved? It would also be great, if you can provide appropriate core-site.xml and hive-site.xml configurations that needs to changed when using Google Cloud Storage for persistence. Further information: We ran our Flink job through Google Dataproc Cluster's YARN. The storage used was Google Cloud Storage. We have followed the approach based on https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing. The first exception I have pasted below is, the one we get when flink tries to create a inprogress file in the Google Cloud storage: 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem [] - create(path: gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437, options: CreateFileOptions{overwriteExisting=false, contentType=application/octet-stream, attributes={}, checkNoDirectoryConflict=true, ensureParentDirectoriesExist=true, resources=-1}) 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId [] - fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true) 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true) 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/) 2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/) 2021-01-14 02:43:54,226 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/): not found com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found GET https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437%2F { "code" : 404, "errors" : [ { "domain" : "global", "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/", "reason" : "notFound" } ], "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/" } at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1110) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:224) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:84) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:76) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:691) [gcs-connector-hadoop3-2.1.6.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1119) [hadoop-common-3.2.1.jar:?] at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1099) [hadoop-common-3.2.1.jar:?] at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) [hive-exec.jar:3.1.2] at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295) [hive-exec.jar:3.1.2] at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:466) [hive-exec.jar:3.1.2] at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:416) [hive-exec.jar:3.1.2] at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411) [hive-exec.jar:3.1.2] at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:70) [hive-exec.jar:3.1.2] at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:137) [hive-exec.jar:3.1.2] at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:126) [hive-exec.jar:3.1.2] at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:297) [hive-exec.jar:3.1.2] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_275] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_275] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_275] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275] at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) [flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at StreamExecCalc$42.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at StreamExecCalc$19.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [flink-dist_2.12-1.11.2.jar:1.11.2] The second exception is, which is thrown after the file creation, when it tries to write a record to the parquet file: 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true) 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId [] - fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true) 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true) 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437) 2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437) 2021-01-14 02:43:54,688 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437): not found com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found GET https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437 { "code" : 404, "errors" : [ { "domain" : "global", "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437", "reason" : "notFound" } ], "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437" } at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) ~[gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1115) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1083) [gcs-connector-hadoop3-2.1.6.jar:?] at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1079) [gcs-connector-hadoop3-2.1.6.jar:?] at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1684) [hadoop-common-3.2.1.jar:?] at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:55) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) [flink-connector-hive_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) [flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) [flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) [flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at StreamExecCalc$42.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) [flink-table-blink_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at StreamExecCalc$19.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [flink-dist_2.12-1.11.2.jar:1.11.2] Thanks in Advance Aswin |
Hi Aswin,
I am not aware of any particular problems. In order to pinpoint the problem, I would suggest to reduce the complexity of the job to see where it goes wrong. For example, you could try to not use the Hive connector and instead write directly to GCS. Then you could try to write to a different DFS in order to rule out that the problem is related to the GCS filesystem. Then maybe try to not use Parquet as the output format, etc. What could also help is if you have a minimal self-contained job with example data which allows us to reproduce the problem. Cheers, Till On Sun, Jan 17, 2021 at 7:46 PM Sivaraman Venkataraman, Aswin Ram < [hidden email]> wrote: > Hi Everyone, > Hope everything is well. We are using Flink's Table API to read data from > Kafka and write to Google Cloud Storage in Parquet File format. The flink > version we are using is 1.11.2. The checkpointing interval we specified was > 3 minutes. The issue we are facing is, though we generated around 7.5 > million records to Kafka, the parquet file size created in Google Cloud > storage was in terms of KB (378 kb) and the number of records > were much smaller (1200 records per file) than what was generated. We set > the logging property to DEBUG and re-ran our flink job in the same cluster. > > Following were the exceptions we found in the flink' task manager log. > We see the below two HTTP 404 exceptions thrown by GCS-HADOOP Connector > for record written by Flink. > We want to know, if the community has faced issues when you guys have > tried streaming data through Flink to Google Cloud Storage from Kafka. If > yes, what were those issues and how was it resolved? It would also be > great, if you can provide appropriate core-site.xml and hive-site.xml > configurations that needs to changed when using Google Cloud Storage for > persistence. > > Further information: > We ran our Flink job through Google Dataproc Cluster's YARN. The storage > used was Google Cloud Storage. We have followed the approach based on > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing > . > > The first exception I have pasted below is, the one we get when flink > tries to create a inprogress file in the Google Cloud storage: > > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem > [] - create(path: > gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437, > options: CreateFileOptions{overwriteExisting=false, > contentType=application/octet-stream, attributes={}, > checkNoDirectoryConflict=true, ensureParentDirectoriesExist=true, > resources=-1}) > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId > [] - > fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', > true) > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - > validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', > true) > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - validateObjectName -> > 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/) > 2021-01-14 02:43:54,196 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/) > 2021-01-14 02:43:54,226 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/): > not found > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: > 404 Not Found > GET > https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437%2F > { > "code" : 404, > "errors" : [ { > "domain" : "global", > "message" : "No such object: > hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/", > "reason" : "notFound" > } ], > "message" : "No such object: > hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/" > } > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1110) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:224) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:84) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:76) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:691) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1119) > [hadoop-common-3.2.1.jar:?] > at > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1099) > [hadoop-common-3.2.1.jar:?] > at > org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) > [hive-exec.jar:3.1.2] > at > org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295) > [hive-exec.jar:3.1.2] > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:466) > [hive-exec.jar:3.1.2] > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:416) > [hive-exec.jar:3.1.2] > at > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411) > [hive-exec.jar:3.1.2] > at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:70) > [hive-exec.jar:3.1.2] > at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:137) > [hive-exec.jar:3.1.2] > at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:126) > [hive-exec.jar:3.1.2] > at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:297) > [hive-exec.jar:3.1.2] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:1.8.0_275] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_275] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_275] > at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_275] > at > org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) > [flink-table-blink_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at StreamExecCalc$42.processElement(Unknown Source) > [flink-table-blink_2.12-1.11.2.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at StreamExecCalc$19.processElement(Unknown Source) > [flink-table-blink_2.12-1.11.2.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > [flink-dist_2.12-1.11.2.jar:1.11.2] > > > The second exception is, which is thrown after the file creation, when it > tries to write a record to the parquet file: > > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - > validateObjectName('hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', > true) > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - validateObjectName -> > 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId > [] - > fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', > true) > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - > validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', > true) > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths > [] - validateObjectName -> > 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437' > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437) > 2021-01-14 02:43:54,666 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437) > 2021-01-14 02:43:54,688 DEBUG > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl > [] - > getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437): > not found > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: > 404 Not Found > GET > https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437 > { > "code" : 404, > "errors" : [ { > "domain" : "global", > "message" : "No such object: > hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437", > "reason" : "notFound" > } ], > "message" : "No such object: > hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437" > } > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) > ~[gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1115) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1083) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1079) > [gcs-connector-hadoop3-2.1.6.jar:?] > at > org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1684) > [hadoop-common-3.2.1.jar:?] > at > org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:55) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) > [flink-connector-hive_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) > [flink-table-blink_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) > [flink-table-blink_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) > [flink-table-blink_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at StreamExecCalc$42.processElement(Unknown Source) > [flink-table-blink_2.12-1.11.2.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) > [flink-table-blink_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at StreamExecCalc$19.processElement(Unknown Source) > [flink-table-blink_2.12-1.11.2.jar:?] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > [flink-dist_2.12-1.11.2.jar:1.11.2] > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) > [flink-dist_2.12-1.11.2.jar:1.11.2] > > Thanks in Advance > Aswin > |
Free forum by Nabble | Edit this page |