Temporal Table Ordering

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Temporal Table Ordering

Dominik Wosiński
Hey,
I have a question about the ordering of the messages in the Temporal Table.
I can observe that for one of my jobs the order of input is correct but the
order of the output is not correct.
Say I have two streams that both have *id* field which will be used to join
and also for Kafka partitioning. Let's say the streams are A and B, and the
stream B is used to create a temporal table function.

I have added logging for all elements that are deserialized and serialized
by jobs. So, I can see the following situation:

*Deserializing B with id = 1 and timestamp = 10*
*Deserializing A with id = 1 and timestamp = 20*
*Deserializing A with id = 1 and timestamp = 30*
*Deserializing A with id = 1 and timestamp = 40*
*[Other messages that cause watermarks to be pushed]*

But the logging from serialization schema is:

*Serializing Joined with id = 1 and timestamp = 30*
*Serializing Joined with id = 1 and timestamp = 20*
*Serializing Joined with id = 1 and timestamp = 10*
*Serializing Joined with id = 1 and timestamp = 40*

The input data has proper ordering and is properly partitioned on Kafka. Is
there any known issue that might be causing that? I have virtually run out
of ideas on why I might be observing that.

I will be glad for any help.
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Temporal Table Ordering

Till Rohrmann
Hi Dominik,

Are you using Flink's Table API or SQL? If yes, maybe you can share the
program with us to see what exactly your user program is doing. Also it
would help us if you have some example data that would help reproducing the
problem. Moreover, which Flink version are you running?

Cheers,
Till

On Mon, Nov 23, 2020 at 2:24 PM Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I have a question about the ordering of the messages in the Temporal Table.
> I can observe that for one of my jobs the order of input is correct but the
> order of the output is not correct.
> Say I have two streams that both have *id* field which will be used to join
> and also for Kafka partitioning. Let's say the streams are A and B, and the
> stream B is used to create a temporal table function.
>
> I have added logging for all elements that are deserialized and serialized
> by jobs. So, I can see the following situation:
>
> *Deserializing B with id = 1 and timestamp = 10*
> *Deserializing A with id = 1 and timestamp = 20*
> *Deserializing A with id = 1 and timestamp = 30*
> *Deserializing A with id = 1 and timestamp = 40*
> *[Other messages that cause watermarks to be pushed]*
>
> But the logging from serialization schema is:
>
> *Serializing Joined with id = 1 and timestamp = 30*
> *Serializing Joined with id = 1 and timestamp = 20*
> *Serializing Joined with id = 1 and timestamp = 10*
> *Serializing Joined with id = 1 and timestamp = 40*
>
> The input data has proper ordering and is properly partitioned on Kafka. Is
> there any known issue that might be causing that? I have virtually run out
> of ideas on why I might be observing that.
>
> I will be glad for any help.
> Best Regards,
> Dom.
>