Flink 1.3.2 with CEP Pattern, Memory usage increases results in OOM

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.3.2 with CEP Pattern, Memory usage increases results in OOM

Abiramalakshmi Natarajan
I am using Flink 1.3.2 CEP pattern to detect a frequently occurring
condition. On scale testing this pattern with 10k events per minute, memory
leak happens finally OOM.

I found a related JIRA FLINK-7606 where it mentioned to specifiying
EventTime as streamTimeCharacteristic.

I have also configured the same, Also i am using RMQSource . Still i am
facing the memory leak, can you please let me know whether i am missing
anything.

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<ApCheckInEvent> apCheckInReqStreams =
env.addSource(RMQSourceHelper.createAPCheckInSource(parameters))
                                .assignTimestampsAndWatermarks(new
IngestionTimeExtractor<ApCheckInEvent>());
DataStream<ApConnectEvent> apconnectReqStream =
env.addSource(RMQSourceHelper.createAPConnectSource(parameters))
                                .assignTimestampsAndWatermarks(new IngestionTimeExtractor<
ApConnectEvent>());


                DataStream<Event> apcheckInEventStream =  apCheckInReqStreams.flatMap(new
APCheckInEventMapper());

                DataStream<Event> apconnectEventStream = apconnectReqStream.flatMap(new
APConnectCheckInEventMapper());

                DataStream<Event> unifiedDevicecheckInStream =
apcheckInEventStream.union(apconnectEventStream);
               
                DataStream<DeviceCheckInEvent> deviceCheckInStream =
unifiedDevicecheckInStream .flatMap(new FlatMapFunction<Event,
DeviceCheckInEvent>() {

                        @Override
                        public void flatMap(Event value, Collector<DeviceCheckInEvent> out)
throws Exception
                        {
                                DeviceCheckInEvent deviceCheckInEvent = new DeviceCheckInEvent();
                                deviceCheckInEvent.setEntityType(value.getDeviceType());
                                deviceCheckInEvent.setDeviceMacAddress(value.getMac());
                                deviceCheckInEvent.setEventType(value.getType());
                                deviceCheckInEvent.setSerialNum(value.getSource());
                                deviceCheckInEvent.setDeviceModel(value.getModel());
                       
deviceCheckInEvent.setStreamType(StreamType.getStreamType(value.getType()));
                                deviceCheckInEvent.setEventTime(value.getTimeStamp());
                                deviceCheckInEvent.setTenantId(value.getTenantId());
                                deviceCheckInEvent.setSiteid(value.getSiteId());
                                out.collect(deviceCheckInEvent);
                        }
                });


Below is my pattern detection code:

Pattern<DeviceCheckInEvent, ?> connectEventPattern1 =
Pattern.<DeviceCheckInEvent>begin("first")
                                                                                .where(new IterativeCondition<DeviceCheckInEvent>() {
                                                                                                       
                                                                                                        @Override
                                                                                                        public boolean filter(DeviceCheckInEvent value,
Context<DeviceCheckInEvent> ctx) throws Exception {
                                                                                                                return value.getEventType() == EventType.ApConnect;  
                                                                                                        }
                                                                                                }).times(4).within(Time.minutes(6));

                DataStream<Event> frequentEventStream =  
CEP.pattern(inputStream.keyBy(new KeySelector<DeviceCheckInEvent, String>()
{

                        @Override
                        public String getKey(DeviceCheckInEvent arg0) throws Exception {
                                return arg0.getSerialNum();
                        }
                }), connectEventPattern1).flatSelect(new
PatternFlatSelectFunction<DeviceCheckInEvent, Event>() {

                        @Override
                        public void flatSelect(Map<String, List&lt;DeviceCheckInEvent>> pattern,
Collector<Event> out) throws Exception
                        {
                                List<DeviceCheckInEvent> connectOccurrences = pattern.get("first");

                                DeviceCheckInEvent freqConnEvent =
connectOccurrences.get(connectOccurrences.size()-1);
                                Event event = new Event(EventType.FrequentConnects,
freqConnEvent.getEntityType(), freqConnEvent.getTenantId(),
                                                freqConnEvent.getSerialNum(), freqConnEvent.getSiteid(),
freqConnEvent.getMac(), freqConnEvent.getEventTime());
                       
event.setEventDescr(EMEventMessage.frequentconnects.getMsgFormat(freqConnEvent.getEntityType(),
freqConnEvent.getSerialNum(), count, timeElapsed));
                                                                   
                                logger.warn("Detected Frequent Connects:"+ builder.toString());  
                                out.collect(event);
                        }
                });



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.3.2 with CEP Pattern, Memory usage increases results in OOM

Kostas Kloudas
Hi Abiramalakshmi,

Thanks for reporting this!

As a starting point I would recommend:

1) use RocksDB as your backend, so that state is not accumulated in memory
2) enable incremental checkpoints
3) the “new IterativeCondition<DeviceCheckInEvent>() {…}” can become “new SimpleCondition<DeviceCheckInEvent>() {}”,
    as this is more efficient
4) set the default watermark interval to a small value so that you have frequent watermarks and elements are not
   accumulated.

If you do the above, please let me know if the problems persist.

Thanks,
Kostas


> On Apr 10, 2018, at 1:19 PM, Abiramalakshmi Natarajan <[hidden email]> wrote:
>
> new Iter