CoProcessFunction doesn't support timer on keyed stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

CoProcessFunction doesn't support timer on keyed stream

Ken Krugler
Hi devs,

I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction.

One of the streams is keyed, and the other is broadcast.

As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with:

java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.
        at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)

CoProcessOperator.java has:

                @Override
                public void registerProcessingTimeTimer(long time) {
                        throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
                }

                @Override
                public void registerEventTimeTimer(long time) {
                        throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
                }

So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction.

If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem?

I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc.

Thanks,

— Ken

--------------------------------------------
http://about.me/kkrugler
+1 530-210-6378

Reply | Threaded
Open this post in threaded view
|

Re: CoProcessFunction doesn't support timer on keyed stream

Kostas Kloudas
Hi Ken,

It is true that there is not reason for not having access to the timerService from the processElement of
the keyed side. On the other side (the non-keyed side) you cannot set timers because timers are bound
to a specific key.

Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also has BroadcastState which
does exactly what you are describing.

Unfortunately the documentation is being prepared but I will open a Pull Request today and I can send
you the link so that you can have a look.

Kostas

> On Apr 26, 2018, at 2:37 AM, Ken Krugler <[hidden email]> wrote:
>
> Hi devs,
>
> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction.
>
> One of the streams is keyed, and the other is broadcast.
>
> As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with:
>
> java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.
> at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)
>
> CoProcessOperator.java has:
>
> @Override
> public void registerProcessingTimeTimer(long time) {
> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
> }
>
> @Override
> public void registerEventTimeTimer(long time) {
> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
> }
>
> So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction.
>
> If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem?
>
> I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc.
>
> Thanks,
>
> — Ken
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>

Reply | Threaded
Open this post in threaded view
|

Re: CoProcessFunction doesn't support timer on keyed stream

Kostas Kloudas
Hi again Ken,

This is the PR https://github.com/apache/flink/pull/5922 <https://github.com/apache/flink/pull/5922> I promised.

You can build the docs by going in the docs directory of the Flink repo and executing
        ./build_docs.sh -p

After it finishes, you will be able to see all the documentation at localhost:4000 and
the Broadcast State one at:

       http://localhost:4000/dev/stream/state/broadcast_state.html <http://localhost:4000/dev/stream/state/broadcast_state.html>

Any feedback is welcomed!

Cheers,
Kostas

> On Apr 26, 2018, at 11:09 AM, Kostas Kloudas <[hidden email]> wrote:
>
> Hi Ken,
>
> It is true that there is not reason for not having access to the timerService from the processElement of
> the keyed side. On the other side (the non-keyed side) you cannot set timers because timers are bound
> to a specific key.
>
> Now, if one stream is broadcasted and the other is keyed, then FLINK-1.5 also has BroadcastState which
> does exactly what you are describing.
>
> Unfortunately the documentation is being prepared but I will open a Pull Request today and I can send
> you the link so that you can have a look.
>
> Kostas
>
>> On Apr 26, 2018, at 2:37 AM, Ken Krugler <[hidden email]> wrote:
>>
>> Hi devs,
>>
>> I’m using Flink 1.5-SNAPSHOT, and I’ve got a connected stream that I’m using with a CoProcessFunction.
>>
>> One of the streams is keyed, and the other is broadcast.
>>
>> As per the documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.html>), I tried to set a timer, but that fails with:
>>
>> java.lang.UnsupportedOperationException: Setting timers is only supported on a keyed streams.
>> at org.apache.flink.streaming.api.operators.co.CoProcessOperator$ContextImpl.registerProcessingTimeTimer(CoProcessOperator.java:123)
>>
>> CoProcessOperator.java has:
>>
>> @Override
>> public void registerProcessingTimeTimer(long time) {
>> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
>> }
>>
>> @Override
>> public void registerEventTimeTimer(long time) {
>> throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
>> }
>>
>> So it seems like the documentation is wrong, and you currently can’t use timers with CoProcessFunction.
>>
>> If that’s true, I’m curious why. Is it just an implementation detail, or is there a fundamental architectural problem?
>>
>> I can see some challenges with needing two onTimerX() methods, and thus different timer services for each method, etc.
>>
>> Thanks,
>>
>> — Ken
>>
>> --------------------------------------------
>> http://about.me/kkrugler
>> +1 530-210-6378
>>
>