We use Flink to process transactional events. A job was created to aggregate information about the clients, day of week and hour of day and thus creating a profile as shown in the attached code.
val stream = env.addSource(consumer) val result = stream .map(openTransaction => { val transactionDate = openTransaction.get("transactionDate") val date = if (transactionDate.isTextual) LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli else transactionDate.asLong (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date)) }) .keyBy(0) .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) .sum(1) In the code above, the stream has three fields: "transactionDate", "clientId" and "amount". We make a keyed stream by the clientId and a sliding window summing the amount. There are around 100.000 unique active clientIds in our database. After some time running, the total RAM used by the job is stabilized at 36 GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to reduce the RAM usage of the job, maybe by configuring Flink's replication factor or by using RocksDB? Best regards |
Hi Gabriel,
Yes, using RocksDB state backend can relieve your RAM usage. I see a few issues with your job: 1) it's keeping track of 672 windows (28x24), that's lots of data, so try to reduce number of windows 2) use reduce functions to incrementally aggregate state, rather than buffering data internally BTW, this kind of questions should be posted to *user@flink alias* rather than dev@flink. Bowen On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <[hidden email]> wrote: > We use Flink to process transactional events. A job was created to > aggregate information about the clients, day of week and hour of day and > thus creating a profile as shown in the attached code. > > > val stream = env.addSource(consumer) > val result = stream > .map(openTransaction => { > val transactionDate = openTransaction.get("transactionDate") > val date = if (transactionDate.isTextual) > LocalDateTime.parse(transactionDate.asText, > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli > else > transactionDate.asLong > (openTransaction.get("clientId").asLong, > openTransaction.get("amount").asDouble, new Timestamp(date)) > }) > .keyBy(0) > .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) > .sum(1) > > In the code above, the stream has three fields: "transactionDate", > "clientId" and "amount". We make a keyed stream by the clientId and a > sliding window summing the amount. There are around 100.000 unique active > clientIds in our database. > > After some time running, the total RAM used by the job is stabilized at 36 > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > reduce the RAM usage of the job, maybe by configuring Flink's replication > factor or by using RocksDB? > > > Best regards > > > > > > |
Agree with Bowen on this note: you should probably use some more efficient
way of handling the data in sliding window, since data will be "assigned" to each sliding window through a window assigner and thus costs extra memory usage. BTW: since we are on this topic, I was wondering if there's any way of improving the memory efficiency in dealing elements that belongs to overlapping windows. -- Rong On Thu, May 3, 2018 at 9:40 PM, Bowen Li <[hidden email]> wrote: > Hi Gabriel, > > Yes, using RocksDB state backend can relieve your RAM usage. I see a few > issues with your job: 1) it's keeping track of 672 windows (28x24), that's > lots of data, so try to reduce number of windows 2) use reduce functions to > incrementally aggregate state, rather than buffering data internally > > BTW, this kind of questions should be posted to *user@flink alias* rather > than dev@flink. > > Bowen > > > > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo < > [hidden email]> > wrote: > > > We use Flink to process transactional events. A job was created to > > aggregate information about the clients, day of week and hour of day and > > thus creating a profile as shown in the attached code. > > > > > > val stream = env.addSource(consumer) > > val result = stream > > .map(openTransaction => { > > val transactionDate = openTransaction.get("transactionDate") > > val date = if (transactionDate.isTextual) > > LocalDateTime.parse(transactionDate.asText, > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli > > else > > transactionDate.asLong > > (openTransaction.get("clientId").asLong, > > openTransaction.get("amount").asDouble, new Timestamp(date)) > > }) > > .keyBy(0) > > .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) > > .sum(1) > > > > In the code above, the stream has three fields: "transactionDate", > > "clientId" and "amount". We make a keyed stream by the clientId and a > > sliding window summing the amount. There are around 100.000 unique active > > clientIds in our database. > > > > After some time running, the total RAM used by the job is stabilized at > 36 > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > > reduce the RAM usage of the job, maybe by configuring Flink's replication > > factor or by using RocksDB? > > > > > > Best regards > > > > > > > > > > > > > |
Hi,
There are a few JIRA tickets that address this problem [1] [2]. Summary: The best execution strategy depends on the amount of data / window configuration. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7001 [2] https://issues.apache.org/jira/browse/FLINK-5387 2018-05-04 7:22 GMT+02:00 Rong Rong <[hidden email]>: > Agree with Bowen on this note: you should probably use some more efficient > way of handling the data in sliding window, since data will be "assigned" > to each sliding window through a window assigner and thus costs extra > memory usage. > > BTW: since we are on this topic, I was wondering if there's any way of > improving the memory efficiency in dealing elements that belongs to > overlapping windows. > > -- > Rong > > On Thu, May 3, 2018 at 9:40 PM, Bowen Li <[hidden email]> wrote: > > > Hi Gabriel, > > > > Yes, using RocksDB state backend can relieve your RAM usage. I see a few > > issues with your job: 1) it's keeping track of 672 windows (28x24), > that's > > lots of data, so try to reduce number of windows 2) use reduce functions > to > > incrementally aggregate state, rather than buffering data internally > > > > BTW, this kind of questions should be posted to *user@flink alias* > rather > > than dev@flink. > > > > Bowen > > > > > > > > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo < > > [hidden email]> > > wrote: > > > > > We use Flink to process transactional events. A job was created to > > > aggregate information about the clients, day of week and hour of day > and > > > thus creating a profile as shown in the attached code. > > > > > > > > > val stream = env.addSource(consumer) > > > val result = stream > > > .map(openTransaction => { > > > val transactionDate = openTransaction.get("transactionDate") > > > val date = if (transactionDate.isTextual) > > > LocalDateTime.parse(transactionDate.asText, > > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset. > UTC).toEpochMilli > > > else > > > transactionDate.asLong > > > (openTransaction.get("clientId").asLong, > > > openTransaction.get("amount").asDouble, new Timestamp(date)) > > > }) > > > .keyBy(0) > > > .window(SlidingEventWeekTimeWindows.of(Time.days(28), > Time.hours(1))) > > > .sum(1) > > > > > > In the code above, the stream has three fields: "transactionDate", > > > "clientId" and "amount". We make a keyed stream by the clientId and a > > > sliding window summing the amount. There are around 100.000 unique > active > > > clientIds in our database. > > > > > > After some time running, the total RAM used by the job is stabilized at > > 36 > > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > > > reduce the RAM usage of the job, maybe by configuring Flink's > replication > > > factor or by using RocksDB? > > > > > > > > > Best regards > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |