Hey,
I have a question regarding DataStream created from multiple files in s3. I have several files in AWS s3, say the path is s3://files/, and then there are several folders for different days, so in the end the full paths look like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I wanted to read all the files and sort them via some specific value. I thought that I could use the fact that the Long.MAX watermark is generated, so I've decided to use event time window of size larger than the data in files. So, I have something like: val inputFormat =new ParquetAvroInputFormat[TestData](new Path( ("s3a://files/"))) inputFormat.setNestedFileEnumeration(true) val ste = StreamExecutionEnvironment.createLocalEnvironment(1) ste.createInput(inputFormat) .assignTimestampsAndWatermarks( new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp)) .keyBy(_.getKey) .timeWindow(Time.days(90)) .sideOutputLateData(sideOutput) .process(new ProcessWindowFunction[TestData, TestData, String, TimeWindow] { override def process(key: String, context: Context, elements: Iterable[TestData], out: Collector[TestData]): Unit = { println("Processing: " + elements.toList.size + " for key:" + key) elements.toSeq.sortBy(_.getTimestamp) .foreach(out.collect(_)) } }) *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes but those do not cause the issue described here.* The data in files is kept for 30 days, so there is no way that window will be closed before the files are closed and *Long.Max* timestamp generated. Now, the problem I am observing is that I would expect to see one message printed per key, since the parallelism is one. But for some reason I am observing that for some of the keys(most of them really) there are two windows created*. *I have 30 unique keys and each key contains around 1M records. And The output I can see is more or less like that: 1. Several messages about Switching to Random IO seek policy 2. Print for most of the keys present in the dataset (but the counts are quite small, most of them around 100k, some as small as few hundred) 3. More Switching to Random IO seek policy 4. Print again for some keys, but now the counts are much higher. So, the total count of all processed values is correct. It's just I am interested why the window gets invoked twice. Thanks in advance, Best Regards, Dom. |
Hi Dominik,
I think the problem could be that TumblingTimeWindows don't start with the timestamp of the first arriving event but start at a multiple of the window length. So when defining a 90 day tumbling window you define a window from 0 - 89, 90 - 179, .... If your data ranges from day 79 - 109, then it would fall into two windows. Cheers, Till On Mon, Mar 1, 2021 at 5:34 PM Dominik Wosiński <[hidden email]> wrote: > Hey, > I have a question regarding DataStream created from multiple files in s3. I > have several files in AWS s3, say the path is s3://files/, and then there > are several folders for different days, so in the end the full paths look > like : s3://files/day=1/file.parquet, s3://files/day=2/file.parquet. I > wanted to read all the files and sort them via some specific value. > > I thought that I could use the fact that the Long.MAX watermark is > generated, so I've decided to use event time window of size larger than the > data in files. > > So, I have something like: > > val inputFormat =new ParquetAvroInputFormat[TestData](new Path( > ("s3a://files/"))) > inputFormat.setNestedFileEnumeration(true) > val ste = StreamExecutionEnvironment.createLocalEnvironment(1) > ste.createInput(inputFormat) > .assignTimestampsAndWatermarks( > new OutOfOrdernessWatermarkStrategy[TestData](3000, _.getTimestamp)) > .keyBy(_.getKey) > .timeWindow(Time.days(90)) > .sideOutputLateData(sideOutput) > .process(new ProcessWindowFunction[TestData, TestData, String, > TimeWindow] { > override def process(key: String, context: Context, > elements: Iterable[TestData], > out: Collector[TestData]): Unit = { > println("Processing: " + elements.toList.size + " for key:" + key) > elements.toSeq.sortBy(_.getTimestamp) > .foreach(out.collect(_)) > } > }) > > > > > > > *ParquetAvroInputFormat and OutOfOrdernessWatermarkStrategy are my classes > but those do not cause the issue described here.* > The data in files is kept for 30 days, so there is no way that window will > be closed before the files are closed and *Long.Max* timestamp generated. > > Now, the problem I am observing is that I would expect to see one message > printed per key, since the parallelism is one. But for some reason I am > observing that for some of the keys(most of them really) there are two > windows created*. *I have 30 unique keys and each key contains around 1M > records. And The output I can see is more or less like that: > > 1. Several messages about Switching to Random IO seek policy > 2. Print for most of the keys present in the dataset (but the counts are > quite small, most of them around 100k, some as small as few hundred) > 3. More Switching to Random IO seek policy > 4. Print again for some keys, but now the counts are much higher. > > So, the total count of all processed values is correct. It's just I am > interested why the window gets invoked twice. > > Thanks in advance, > Best Regards, > Dom. > |
Hey,
Thanks for the answer, as I've mentioned in the email the data range is only 30 days, for the tests I've used the data from october so I basically have timestamps starting at midningt of 1st october 2020 and finishing at 23:59 30 october 2020, so if I understand correctly this shouldn't cause the double windowing, but correct me if I am wrong here. Best Regards, Dom. |
Hey Till,
You were obviously right, my bad here. My math was incorrect. The correct reasoning is that indeed first 5 days of october will be added to the window number 1 and the rest of days will end up in the second window. Solved! Thanks a lotte, Best Regards, Dom. |
Free forum by Nabble | Edit this page |