Tejas Budukh created FLINK-22803:
------------------------------------ Summary: Running multiple CEP patterns Key: FLINK-22803 URL: https://issues.apache.org/jira/browse/FLINK-22803 Project: Flink Issue Type: Bug Components: Library / CEP Affects Versions: 1.13.0 Reporter: Tejas Budukh Hi, I've tried to get help about this error on slack, user mailing list and stackOverflow but with no one responding. I don't know how else to get help hence creating this ticket. We are running into errors when running multiple CEP patterns. Here’s our use-case : We are planning to build a rule based engine on top of flink with huge number of rules and doing a POC for that. For POC we have around 1000 pattern based rules which we are translating into CEP patterns and running these rules on a keyed stream of events data to detect patterns. We are partitioning the stream by orgId and each rule needs to be run into each org. Here’s the code we’ve written to implement that : _DataStream<Event> partitionedInput =_ _eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);_ _List<Rule> ruleList = new ArrayList<>();_ _for (int i = 0; i < 100; i++) {_ _ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));_ _ruleList.add(_ _new Rule("rule" + (i + 500), "process4", "process5", "process6"));_ _}_ _for (Rule rule : ruleList) {_ _String st = rule.getStart();_ _String mi = rule.getMid();_ _String en = rule.getEnd();_ _String nm = rule.getName();_ _Pattern<Event, ?> pattern =_ _Pattern.begin(_ _Pattern.<Event>begin("start")_ _.where(_ _new SimpleCondition<Event>() {_ _@Override_ _public boolean filter(Event value) throws Exception {_ _return value.getProcess().equals(st);_ _}_ _})_ _.followedBy("middle")_ _.where(_ _new SimpleCondition<Event>() {_ _@Override_ _public boolean filter(Event event) {_ _return !event.getProcess().equals(mi);_ _}_ _})_ _.optional()_ _.followedBy("end")_ _.where(_ _new SimpleCondition<Event>() {_ _@Override_ _public boolean filter(Event event) {_ _return event.getProcess().equals(en);_ _}_ _}));_ _PatternStream<Event> patternStream = CEP.pattern(partitionedInput,_ _pattern);_ _DataStream<String> alerts =_ _patternStream.process(_ _new PatternProcessFunction<Event, String>() {_ _@Override_ _public void processMatch(_ _Map<String, List<Event>> map, Context context,_ _Collector<String> collector)_ _throws Exception {_ _Event start = map.containsKey("start") ?_ _map.get("start").get(0) : null;_ _Event middle = map.containsKey("middle") ?_ _map.get("middle").get(0) : null;_ _Event end = map.containsKey("end") ? map.get("end").get(0) :_ _null;_ _StringJoiner joiner = new StringJoiner(",");_ _joiner_ _.add("Rule : " + nm + " ")_ _.add((start == null ? "" : start.getId()))_ _.add((middle == null ? "" : middle.getId()))_ _.add((end == null ? "" : end.getId()));_ _collector.collect(joiner.toString());_ _}_ _});_ _alerts.print();_ We tried to run this code on the flink cluster with 1 task manager with 4 task slots and the task manager crashed with the error : _Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by_ _NoRestartBackoffTimeStrategy_ _at_ _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)_ _at_ _org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)_ _at_ _org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)_ _at_ _org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)_ _at_ _org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)_ _at_ _org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)_ _at_ _org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)_ _at_ _org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)_ _at_ _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)_ _at_ _org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)_ _at_ _org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)_ _at_ _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)_ _at_ _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_ _at_ _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_ _at_ _java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)_ _at_ _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)_ _at_ _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)_ _at_ _org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)_ _at_ _org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)_ _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)_ _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)_ _at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)_ _at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)_ _at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)_ _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)_ _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_ _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_ _at akka.actor.Actor.aroundReceive(Actor.scala:517)_ _at akka.actor.Actor.aroundReceive$(Actor.scala:515)_ _at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)_ _at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)_ _at akka.actor.ActorCell.invoke(ActorCell.scala:561)_ _at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)_ _at akka.dispatch.Mailbox.run(Mailbox.scala:225)_ _at akka.dispatch.Mailbox.exec(Mailbox.scala:235)_ _at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)_ _at_ _akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)_ _at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)_ _at_ _akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)_ _Caused by: java.util.concurrent.CompletionException:_ _java.util.concurrent.TimeoutException: Invocation of public abstract_ _java.util.concurrent.CompletableFuture_ _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_ _timed out._ _at_ _java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)_ _at_ _java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)_ _at_ _java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)_ _at_ _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_ _at_ _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_ _at_ _org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)_ _at_ _java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_ _at_ _java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_ _at_ _java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_ _at_ _java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_ _at_ _org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)_ _at akka.dispatch.OnComplete.internal(Future.scala:263)_ _at akka.dispatch.OnComplete.internal(Future.scala:261)_ _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)_ _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)_ _at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)_ _at_ _org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)_ _at_ _scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)_ _at_ _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)_ _at_ _scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)_ _at_ _scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)_ _at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_ _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_ _at_ _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_ _at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_ _at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_ _at_ _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_ _at_ _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_ _at java.base/java.lang.Thread.run(Thread.java:834)_ _Caused by: java.util.concurrent.TimeoutException: Invocation of public_ _abstract java.util.concurrent.CompletableFuture_ _org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_ _timed out._ _at_ _org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)_ _at_ _org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)_ _at_ _java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)_ _at_ _java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)_ _at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)_ _at_ _java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)_ _at_ _java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)_ _at_ _java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)_ _... 1 more_ _Caused by: akka.pattern.AskTimeoutException: Ask timed out on_ _[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]_ _after [10000 ms]. Message of type_ _[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical_ _reason for `AskTimeoutException` is that the recipient actor didn't send a_ _reply._ _at_ _akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)_ _at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_ _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_ _at_ _scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_ _at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_ _at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_ _at_ _scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_ _at_ _akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_ _at_ _akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_ _... 1 more_ Can somebody help with this ? Why is this code failing ? Is out approach scalable or Is there any better way of doing this ? Considering that every CEP operator creates a thread, will this work in production with so many threads per task slot ? Does CEP library support combining multiple patterns in a single operator/thread ? -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |