[jira] [Created] (FLINK-22803) Running multiple CEP patterns

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22803) Running multiple CEP patterns

Shang Yuanchun (Jira)
Tejas Budukh created FLINK-22803:
------------------------------------

             Summary: Running multiple CEP patterns
                 Key: FLINK-22803
                 URL: https://issues.apache.org/jira/browse/FLINK-22803
             Project: Flink
          Issue Type: Bug
          Components: Library / CEP
    Affects Versions: 1.13.0
            Reporter: Tejas Budukh


Hi,

I've tried to get help about this error on slack, user mailing list and stackOverflow but with no one responding. I don't know how else to get help hence creating this ticket.

We are running into errors when running multiple CEP patterns. Here’s our
use-case :
We are planning to build a rule based engine on top of flink with huge
number of rules and doing a POC for that. For POC we have around 1000
pattern based rules which we are translating into CEP patterns and running
these rules on a keyed stream of events data to detect patterns. We are
partitioning the stream by orgId and each rule needs to be run into each
org. Here’s the code we’ve written to implement that :
_DataStream<Event> partitionedInput =_
    _eventStream.keyBy((KeySelector<Event, String>) Event::getOrgid);_
_List<Rule> ruleList = new ArrayList<>();_
_for (int i = 0; i < 100; i++) {_
  _ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));_
  _ruleList.add(_
      _new Rule("rule" + (i + 500), "process4", "process5", "process6"));_
_}_
_for (Rule rule : ruleList) {_
  _String st = rule.getStart();_
  _String mi = rule.getMid();_
  _String en = rule.getEnd();_
  _String nm = rule.getName();_
  _Pattern<Event, ?> pattern =_
      _Pattern.begin(_
          _Pattern.<Event>begin("start")_
              _.where(_
                  _new SimpleCondition<Event>() {_
                    _@Override_
                    _public boolean filter(Event value) throws Exception {_
                      _return value.getProcess().equals(st);_
                    _}_
                  _})_
              _.followedBy("middle")_
              _.where(_
                  _new SimpleCondition<Event>() {_
                    _@Override_
                    _public boolean filter(Event event) {_
                      _return !event.getProcess().equals(mi);_
                    _}_
                  _})_
              _.optional()_
              _.followedBy("end")_
              _.where(_
                  _new SimpleCondition<Event>() {_
                    _@Override_
                    _public boolean filter(Event event) {_
                      _return event.getProcess().equals(en);_
                    _}_
                  _}));_
  _PatternStream<Event> patternStream = CEP.pattern(partitionedInput,_
_pattern);_
  _DataStream<String> alerts =_
      _patternStream.process(_
          _new PatternProcessFunction<Event, String>() {_
            _@Override_
            _public void processMatch(_
                _Map<String, List&lt;Event>> map, Context context,_
_Collector<String> collector)_
                _throws Exception {_
              _Event start = map.containsKey("start") ?_
_map.get("start").get(0) : null;_
              _Event middle = map.containsKey("middle") ?_
_map.get("middle").get(0) : null;_
              _Event end = map.containsKey("end") ? map.get("end").get(0) :_
_null;_
              _StringJoiner joiner = new StringJoiner(",");_
              _joiner_
                  _.add("Rule : " + nm + " ")_
                  _.add((start == null ? "" : start.getId()))_
                  _.add((middle == null ? "" : middle.getId()))_
                  _.add((end == null ? "" : end.getId()));_
              _collector.collect(joiner.toString());_
            _}_
          _});_
  _alerts.print();_
We tried to run this code on the flink cluster with 1 task manager with 4
task slots and the task manager crashed with the error :
_Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by_
_NoRestartBackoffTimeStrategy_
        _at_
_org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)_
        _at_
_org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)_
        _at_
_org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)_
        _at_
_org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)_
        _at_
_org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)_
        _at_
_org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)_
        _at_
_org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)_
        _at_
_org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)_
        _at_
_org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)_
        _at_
_org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)_
        _at_
_org.apache.flink.runtime.executiongraph.Execution.markFailed(Execution.java:910)_
        _at_
_org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:623)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
        _at_
_java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
        _at_
_java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)_
        _at_
_org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)_
        _at_
_org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)_
        _at_
_org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)_
        _at_
_org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)_
        _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)_
        _at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)_
        _at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)_
        _at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)_
        _at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)_
        _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)_
        _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
        _at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)_
        _at akka.actor.Actor.aroundReceive(Actor.scala:517)_
        _at akka.actor.Actor.aroundReceive$(Actor.scala:515)_
        _at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)_
        _at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)_
        _at akka.actor.ActorCell.invoke(ActorCell.scala:561)_
        _at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)_
        _at akka.dispatch.Mailbox.run(Mailbox.scala:225)_
        _at akka.dispatch.Mailbox.exec(Mailbox.scala:235)_
        _at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)_
        _at_
_akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)_
        _at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)_
        _at_
_akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)_
_Caused by: java.util.concurrent.CompletionException:_
_java.util.concurrent.TimeoutException: Invocation of public abstract_
_java.util.concurrent.CompletableFuture_
_org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
_timed out._
        _at_
_java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)_
        _at_
_java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
        _at_
_org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:234)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)_
        _at_
_java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)_
        _at_
_java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)_
        _at_
_org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1079)_
        _at akka.dispatch.OnComplete.internal(Future.scala:263)_
        _at akka.dispatch.OnComplete.internal(Future.scala:261)_
        _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)_
        _at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)_
        _at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)_
        _at_
_org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)_
        _at_
_scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)_
        _at_
_scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)_
        _at_
_scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)_
        _at_
_scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)_
        _at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
        _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
        _at_
_scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
        _at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
        _at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
        _at_
_scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
        _at_
_akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
        _at java.base/java.lang.Thread.run(Thread.java:834)_
_Caused by: java.util.concurrent.TimeoutException: Invocation of public_
_abstract java.util.concurrent.CompletableFuture_
_org.apache.flink.runtime.taskexecutor.TaskExecutorGateway.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time)_
_timed out._
        _at_
_org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)_
        _at_
_org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:599)_
        _at_
_java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)_
        _at_
_java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)_
        _at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)_
        _at_
_java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)_
        _at_
_java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)_
        _at_
_java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)_
        _... 1 more_
_Caused by: akka.pattern.AskTimeoutException: Ask timed out on_
_[Actor[akka.tcp://flink@192.168.0.4:52041/user/rpc/taskmanager_0#-1397184270]]_
_after [10000 ms]. Message of type_
_[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical_
_reason for `AskTimeoutException` is that the recipient actor didn't send a_
_reply._
        _at_
_akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)_
        _at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)_
        _at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)_
        _at_
_scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)_
        _at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)_
        _at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)_
        _at_
_scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)_
        _at_
_akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)_
        _at_
_akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)_
        _... 1 more_

 

Can somebody help with this ? Why is this code failing ? Is out approach
scalable or Is there any better way of doing this ? Considering that every
CEP operator creates a thread, will this work in production with so many
threads per task slot ? Does CEP library support combining multiple patterns
in a single operator/thread ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)