Hey,
I was wondering if that's currently possible to use KeyedStream to create a properly partitioned Table in Flink 1.11 ? I have a use case where I wanted to first join two streams using Flink SQL and then process them via *KeyedProcessFunction.* So I do something like: implicit val env = StreamExecutionEnvironment.getExecutionEnvironment implicit val ste = StreamTableEnvironment.create(env) val stream1 = env.addSource(someKafkaConsumer) val stream2 = env.addSource(otherKafkaConsumer) val table1 = ste.createTemporaryView("firstTable", stream1.keyBy(_.getId()), $"id", $"data", $"name") val table2 = ste.createTemporaryView("secondTable", stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata") ste.sqlQuery( """ |SELECT * from firstTable |JOIN secondTable ON id = number AND data = metadata |""".stripMargin ) Will Table API respect the fact that I used `KeyedStream` and will it keep the data partitioned by the keys above ? I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream *I was getting the *NullPointerException* when accessing the state inside the KeyedProcessFunction which suggests that the partitioning has indeed changed. So Is it possible to enforce partitioning when working with Table API ?? Thanks in Advance, Best Regards, Dom. |
Hi Dom,
AFAIK, Table API will apply a key partitioner based on the join key for the join operator, [id, data] and [numbeer, metadata] in your case. So the partitioner in the KeyedStreaem is not respected. Best, Jark On Thu, 21 Jan 2021 at 21:39, Dominik Wosiński <[hidden email]> wrote: > Hey, > I was wondering if that's currently possible to use KeyedStream to create a > properly partitioned Table in Flink 1.11 ? I have a use case where I wanted > to first join two streams using Flink SQL and then process them via > *KeyedProcessFunction.* So I do something like: > > implicit val env = StreamExecutionEnvironment.getExecutionEnvironment > implicit val ste = StreamTableEnvironment.create(env) > val stream1 = env.addSource(someKafkaConsumer) > val stream2 = env.addSource(otherKafkaConsumer) > val table1 = ste.createTemporaryView("firstTable", > stream1.keyBy(_.getId()), $"id", $"data", $"name") > val table2 = ste.createTemporaryView("secondTable", > stream2.keyBy(_.getNumber()), "$number", $"userName", $"metadata") > ste.sqlQuery( > """ > |SELECT * from firstTable > |JOIN secondTable ON id = number AND data = metadata > |""".stripMargin > ) > > > Will Table API respect the fact that I used `KeyedStream` and will it keep > the data partitioned by the keys above ? > > I am asking since when after the JOIN I tried to *reinterpretAsKeyedStream > *I > was getting the *NullPointerException* when accessing the state inside the > KeyedProcessFunction which suggests that the partitioning has indeed > changed. So Is it possible to enforce partitioning when working with Table > API ?? > > Thanks in Advance, > Best Regards, > Dom. > |
Hey,
Thanks for the answer. That's what I've been observing but wanted to know for sure. Best Regards, Dom. |
Free forum by Nabble | Edit this page |