Flink performance issue with Google Cloud Storage- (Small Parquet Files)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink performance issue with Google Cloud Storage- (Small Parquet Files)

Sivaraman Venkataraman, Aswin Ram
Hi Everyone,
Hope everything is well. We are using Flink's Table API to read data from Kafka and write to Google Cloud Storage in Parquet File format. The flink version we are using is 1.11.2. The checkpointing interval we specified was 3 minutes. The issue we are facing is, though we generated around 7.5 million records to Kafka, the parquet file size created in Google Cloud storage was in terms of KB (378 kb) and the number of records
were much smaller (1200 records per file) than what was generated. We set the logging property to DEBUG and re-ran our flink job in the same cluster.

Following were the exceptions we found in the flink' task manager log.
We see the below two HTTP 404 exceptions thrown by GCS-HADOOP Connector for record written by Flink.
We want to know, if the community has faced issues when you guys have tried streaming data through Flink to Google Cloud Storage from Kafka. If yes, what were those issues and how was it resolved? It would also be great, if you can provide appropriate core-site.xml and hive-site.xml configurations that needs to changed when using Google Cloud Storage for persistence.

Further information:
We ran our Flink job through Google Dataproc Cluster's YARN. The storage used was Google Cloud Storage. We have followed the approach based on https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing.

The first exception I have pasted below is, the one we get when flink tries to create a inprogress file in the Google Cloud storage:

2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem [] - create(path: gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437, options: CreateFileOptions{overwriteExisting=false, contentType=application/octet-stream, attributes={}, checkNoDirectoryConflict=true, ensureParentDirectoriesExist=true, resources=-1})
2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId [] - fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true)
2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true)
2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/)
2021-01-14 02:43:54,196 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/)
2021-01-14 02:43:54,226 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/): not found
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
GET https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437%2F
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/",
    "reason" : "notFound"
  } ],
  "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/"
}
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1110) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:224) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:84) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:76) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:691) [gcs-connector-hadoop3-2.1.6.jar:?]
                at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1119) [hadoop-common-3.2.1.jar:?]
                at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1099) [hadoop-common-3.2.1.jar:?]
                at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74) [hive-exec.jar:3.1.2]
                at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295) [hive-exec.jar:3.1.2]
                at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:466) [hive-exec.jar:3.1.2]
                at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:416) [hive-exec.jar:3.1.2]
                at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411) [hive-exec.jar:3.1.2]
                at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:70) [hive-exec.jar:3.1.2]
                at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:137) [hive-exec.jar:3.1.2]
                at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:126) [hive-exec.jar:3.1.2]
                at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:297) [hive-exec.jar:3.1.2]
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_275]
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_275]
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_275]
                at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_275]
                at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) [flink-table-blink_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at StreamExecCalc$42.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at StreamExecCalc$19.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [flink-dist_2.12-1.11.2.jar:1.11.2]


The second exception is, which is thrown after the file creation, when it tries to write a record to the parquet file:

2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true)
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId [] - fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true)
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437', true)
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths [] - validateObjectName -> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437)
2021-01-14 02:43:54,666 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437)
2021-01-14 02:43:54,688 DEBUG com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl [] - getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437): not found
com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
GET https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437",
    "reason" : "notFound"
  } ],
  "message" : "No such object: hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437"
}
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591) ~[gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1115) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1083) [gcs-connector-hadoop3-2.1.6.jar:?]
                at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1079) [gcs-connector-hadoop3-2.1.6.jar:?]
                at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1684) [hadoop-common-3.2.1.jar:?]
                at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:55) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84) [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451) [flink-table-blink_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421) [flink-table-blink_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119) [flink-table-blink_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at StreamExecCalc$42.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) [flink-table-blink_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at StreamExecCalc$19.processElement(Unknown Source) [flink-table-blink_2.12-1.11.2.jar:?]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) [flink-dist_2.12-1.11.2.jar:1.11.2]
                at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) [flink-dist_2.12-1.11.2.jar:1.11.2]

Thanks in Advance
Aswin
Reply | Threaded
Open this post in threaded view
|

Re: Flink performance issue with Google Cloud Storage- (Small Parquet Files)

Till Rohrmann
Hi Aswin,

I am not aware of any particular problems. In order to pinpoint the
problem, I would suggest to reduce the complexity of the job to see where
it goes wrong. For example, you could try to not use the Hive connector and
instead write directly to GCS. Then you could try to write to a different
DFS in order to rule out that the problem is related to the GCS filesystem.
Then maybe try to not use Parquet as the output format, etc.

What could also help is if you have a minimal self-contained job with
example data which allows us to reproduce the problem.

Cheers,
Till

On Sun, Jan 17, 2021 at 7:46 PM Sivaraman Venkataraman, Aswin Ram <
[hidden email]> wrote:

> Hi Everyone,
> Hope everything is well. We are using Flink's Table API to read data from
> Kafka and write to Google Cloud Storage in Parquet File format. The flink
> version we are using is 1.11.2. The checkpointing interval we specified was
> 3 minutes. The issue we are facing is, though we generated around 7.5
> million records to Kafka, the parquet file size created in Google Cloud
> storage was in terms of KB (378 kb) and the number of records
> were much smaller (1200 records per file) than what was generated. We set
> the logging property to DEBUG and re-ran our flink job in the same cluster.
>
> Following were the exceptions we found in the flink' task manager log.
> We see the below two HTTP 404 exceptions thrown by GCS-HADOOP Connector
> for record written by Flink.
> We want to know, if the community has faced issues when you guys have
> tried streaming data through Flink to Google Cloud Storage from Kafka. If
> yes, what were those issues and how was it resolved? It would also be
> great, if you can provide appropriate core-site.xml and hive-site.xml
> configurations that needs to changed when using Google Cloud Storage for
> persistence.
>
> Further information:
> We ran our Flink job through Google Dataproc Cluster's YARN. The storage
> used was Google Cloud Storage. We have followed the approach based on
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing
> .
>
> The first exception I have pasted below is, the one we get when flink
> tries to create a inprogress file in the Google Cloud storage:
>
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem
> [] - create(path:
> gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437,
> options: CreateFileOptions{overwriteExisting=false,
> contentType=application/octet-stream, attributes={},
> checkNoDirectoryConflict=true, ensureParentDirectoriesExist=true,
> resources=-1})
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId
> [] -
> fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437',
> true)
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] -
> validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437',
> true)
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] - validateObjectName ->
> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/)
> 2021-01-14 02:43:54,196 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/)
> 2021-01-14 02:43:54,226 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/):
> not found
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
> 404 Not Found
> GET
> https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437%2F
> {
>   "code" : 404,
>   "errors" : [ {
>     "domain" : "global",
>     "message" : "No such object:
> hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/",
>     "reason" : "notFound"
>   } ],
>   "message" : "No such object:
> hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437/"
> }
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1110)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:224)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:84)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.<init>(GoogleHadoopOutputStream.java:76)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:691)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1119)
> [hadoop-common-3.2.1.jar:?]
>                 at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1099)
> [hadoop-common-3.2.1.jar:?]
>                 at
> org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
> [hive-exec.jar:3.1.2]
>                 at
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:295)
> [hive-exec.jar:3.1.2]
>                 at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:466)
> [hive-exec.jar:3.1.2]
>                 at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:416)
> [hive-exec.jar:3.1.2]
>                 at
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
> [hive-exec.jar:3.1.2]
>                 at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:70)
> [hive-exec.jar:3.1.2]
>                 at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:137)
> [hive-exec.jar:3.1.2]
>                 at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:126)
> [hive-exec.jar:3.1.2]
>                 at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:297)
> [hive-exec.jar:3.1.2]
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:1.8.0_275]
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_275]
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_275]
>                 at java.lang.reflect.Method.invoke(Method.java:498)
> ~[?:1.8.0_275]
>                 at
> org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119)
> [flink-table-blink_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at StreamExecCalc$42.processElement(Unknown Source)
> [flink-table-blink_2.12-1.11.2.jar:?]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at StreamExecCalc$19.processElement(Unknown Source)
> [flink-table-blink_2.12-1.11.2.jar:?]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
>
> The second exception is, which is thrown after the file creation, when it
> tries to write a record to the parquet file:
>
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] -
> validateObjectName('hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437',
> true)
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] - validateObjectName ->
> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StorageResourceId
> [] -
> fromUriPath('gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437',
> true)
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] -
> validateObjectName('/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437',
> true)
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.StringPaths
> [] - validateObjectName ->
> 'hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437'
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getItemInfo(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437)
> 2021-01-14 02:43:54,666 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437)
> 2021-01-14 02:43:54,688 DEBUG
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl
> [] -
> getObject(gs://hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437):
> not found
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException:
> 404 Not Found
> GET
> https://storage.googleapis.com/storage/v1/b/hermes-alpha2-bucket/o/hive-warehouse%2Ftest_ddl_generator_hive.db%2Faction_hit_onetenant_onepart_debug%2Ftenantid=Netflix%2Fts_date=2020-01-13%2Fts_hour=23%2F.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437
> {
>   "code" : 404,
>   "errors" : [ {
>     "domain" : "global",
>     "message" : "No such object:
> hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437",
>     "reason" : "notFound"
>   } ],
>   "message" : "No such object:
> hermes-alpha2-bucket/hive-warehouse/test_ddl_generator_hive.db/action_hit_onetenant_onepart_debug/tenantid=Netflix/ts_date=2020-01-13/ts_hour=23/.part-a1bc5a6e-dd86-409b-afad-f6bad4dbb6a2-0-0.inprogress.53271f4e-cd05-421e-b7c8-6c3ac77f4437"
> }
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:149)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:112)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:39)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:443)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1108)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
> ~[gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getObject(GoogleCloudStorageImpl.java:1959)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getItemInfo(GoogleCloudStorageImpl.java:1857)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfoInternal(GoogleCloudStorageFileSystem.java:1115)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.getFileInfo(GoogleCloudStorageFileSystem.java:1083)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1079)
> [gcs-connector-hadoop3-2.1.6.jar:?]
>                 at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1684)
> [hadoop-common-3.2.1.jar:?]
>                 at
> org.apache.flink.connectors.hive.write.HiveBulkWriterFactory$1.getSize(HiveBulkWriterFactory.java:55)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter.getSize(HadoopPathBasedPartFileWriter.java:84)
> [flink-connector-hive_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:451)
> [flink-table-blink_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.table.filesystem.FileSystemTableSink$TableRollingPolicy.shouldRollOnEvent(FileSystemTableSink.java:421)
> [flink-table-blink_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:193)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:119)
> [flink-table-blink_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at StreamExecCalc$42.processElement(Unknown Source)
> [flink-table-blink_2.12-1.11.2.jar:?]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
> [flink-table-blink_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at StreamExecCalc$19.processElement(Unknown Source)
> [flink-table-blink_2.12-1.11.2.jar:?]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> [blob_p-ab51099ac45fd319c32b31f607c211098689d76e-be571f2e710be87469f7c7762a5d6135:?]
>                 at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>                 at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> [flink-dist_2.12-1.11.2.jar:1.11.2]
>
> Thanks in Advance
> Aswin
>