Hi devs,
I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction. One of the streams is keyed, and the other is broadcast. As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with: java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams. at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123) CoProcessOperator.java has: @Override public void registerProcessingTimeTimer(long time) { throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); } @Override public void registerEventTimeTimer(long time) { throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); } So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction. If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem? I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc. Thanks, — Ken -------------------------------------------- http://about.me/kkrugler +1 530-210-6378 |
Hi Ken,
It is true that there is not reason for not having access to the timerService from the processElement of the keyed side. On the other side (the non-keyed side) you cannot set timers because timers are bound to a specific key. Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also has BroadcastState which does exactly what you are describing. Unfortunately the documentation is being prepared but I will open a Pull Request today and I can send you the link so that you can have a look. Kostas > On Apr 26, 2018, at 2:37 AM, Ken Krugler <[hidden email]> wrote: > > Hi devs, > > I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction. > > One of the streams is keyed, and the other is broadcast. > > As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with: > > java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams. > at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123) > > CoProcessOperator.java has: > > @Override > public void registerProcessingTimeTimer(long time) { > throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); > } > > @Override > public void registerEventTimeTimer(long time) { > throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); > } > > So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction. > > If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem? > > I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc. > > Thanks, > > — Ken > > -------------------------------------------- > http://about.me/kkrugler > +1 530-210-6378 > |
Hi again Ken,
This is the PR https://github.com/apache/flink/pull/5922 <https://github.com/apache/flink/pull/5922> I promised. You can build the docs by going in the docs directory of the Flink repo and executing ./build_docs.sh -p After it finishes, you will be able to see all the documentation at localhost:4000 and the Broadcast State one at: http://localhost:4000/dev/stream/state/broadcast_state.html <http://localhost:4000/dev/stream/state/broadcast_state.html> Any feedback is welcomed! Cheers, Kostas > On Apr 26, 2018, at 11:09 AM, Kostas Kloudas <[hidden email]> wrote: > > Hi Ken, > > It is true that there is not reason for not having access to the timerService from the processElement of > the keyed side. On the other side (the non-keyed side) you cannot set timers because timers are bound > to a specific key. > > Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also has BroadcastState which > does exactly what you are describing. > > Unfortunately the documentation is being prepared but I will open a Pull Request today and I can send > you the link so that you can have a look. > > Kostas > >> On Apr 26, 2018, at 2:37 AM, Ken Krugler <[hidden email]> wrote: >> >> Hi devs, >> >> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction. >> >> One of the streams is keyed, and the other is broadcast. >> >> As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with: >> >> java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams. >> at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123) >> >> CoProcessOperator.java has: >> >> @Override >> public void registerProcessingTimeTimer(long time) { >> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); >> } >> >> @Override >> public void registerEventTimeTimer(long time) { >> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams."); >> } >> >> So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction. >> >> If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem? >> >> I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc. >> >> Thanks, >> >> — Ken >> >> -------------------------------------------- >> http://about.me/kkrugler >> +1 530-210-6378 >> > |
Free forum by Nabble | Edit this page |