Flink Table from KeyedStream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table from KeyedStream

Dominik Wosiński
Hey,
I was wondering if that's currently possible to use KeyedStream to create a
properly partitioned Table in Flink 1.11 ? I have a use case where I wanted
to first join two streams using Flink SQL and then process them via
*KeyedProcessFunction.* So I do something like:

implicit val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val ste = StreamTableEnvironment.create(env)
val stream1 = env.addSource(someKafkaConsumer)
val stream2 = env.addSource(otherKafkaConsumer)
val table1 = ste.createTemporaryView("firstTable",
stream1.keyBy(_.getId()), $"id", $"data", $"name")
val table2 = ste.createTemporaryView("secondTable",
stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata")
ste.sqlQuery(
  """
    |SELECT * from firstTable
    |JOIN secondTable ON id = number AND data = metadata
    |""".stripMargin
)


Will Table API respect the fact that I used `KeyedStream` and will it keep
the data partitioned by the keys above ?

I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream *I
was getting the *NullPointerException* when accessing the state inside the
KeyedProcessFunction which suggests that the partitioning has indeed
changed. So Is it possible to enforce partitioning when working with Table
API ??

Thanks in Advance,
Best Regards,
Dom.
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table from KeyedStream

Jark Wu-2
Hi Dom,

AFAIK, Table API will apply a key partitioner based on the join key for the
join operator,
[id, data] and [numbeer, metadata] in your case. So the partitioner in the
KeyedStreaem
is not respected.

Best,
Jark

On Thu, 21 Jan 2021 at 21:39, Dominik Wosiński <[hidden email]> wrote:

> Hey,
> I was wondering if that's currently possible to use KeyedStream to create a
> properly partitioned Table in Flink 1.11 ? I have a use case where I wanted
> to first join two streams using Flink SQL and then process them via
> *KeyedProcessFunction.* So I do something like:
>
> implicit val env = StreamExecutionEnvironment.getExecutionEnvironment
> implicit val ste = StreamTableEnvironment.create(env)
> val stream1 = env.addSource(someKafkaConsumer)
> val stream2 = env.addSource(otherKafkaConsumer)
> val table1 = ste.createTemporaryView("firstTable",
> stream1.keyBy(_.getId()), $"id", $"data", $"name")
> val table2 = ste.createTemporaryView("secondTable",
> stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata")
> ste.sqlQuery(
>   """
>     |SELECT * from firstTable
>     |JOIN secondTable ON id = number AND data = metadata
>     |""".stripMargin
> )
>
>
> Will Table API respect the fact that I used `KeyedStream` and will it keep
> the data partitioned by the keys above ?
>
> I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream
> *I
> was getting the *NullPointerException* when accessing the state inside the
> KeyedProcessFunction which suggests that the partitioning has indeed
> changed. So Is it possible to enforce partitioning when working with Table
> API ??
>
> Thanks in Advance,
> Best Regards,
> Dom.
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table from KeyedStream

Dominik Wosiński
Hey,
Thanks for the answer. That's what I've been observing but wanted to know
for sure.

Best Regards,
Dom.