[DISCUSS] Enabling more dynamic, or metadata-driven behaviors

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Enabling more dynamic, or metadata-driven behaviors

Maciej Obuchowski
While working on project that's strongly metadata-driven I've had to
overcome several deficiencies, or assumptions in Flink code. Project
involves reading from hundreds (possibly thousands) kafka topics, each
containing avro messages with different schemas. Then, various
transformations and aggregations are applied to that data. Then
transformed and aggregated data is written to several sinks - JDBC,
file, etc. On of the goals is making simple changes possible, like
adding topics, changing transformations or schemas without writing
code - so all is metadata driven.

Some of the things I've encountered:

It's not possible to read avro messages from kafka without somehow
providing reader schema from user code. It's simply impractical to
keep 1000s of schemas (and flink's AvroDeserializationSchemas) around,
or even worse - Kafka consumers per topic/schema.
Solution was to use custom deserialization schema similar to this approach
https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without

Another one was regarding serialization. This might be possible, but I
haven't found way to serialize avro's generic data types without Kryo
fallback, which hurts performance. I've resorted to manually
serializing record to bytes[] and deserializing it in the next task.
Also, see this mail thread where Lasse had similar case with Jackson's
objects.
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Jackson-object-serialisations-td41691.html

Third one was JDBC sink behavior. Currently, it assumes that it will
be used to insert data to one table, provided statically. This one has
the worst implication, because sink consumes one database connection,
which are quite expensive when we're talking about hundreds of them.
In this case there was no other way than forking Flink and providing
another implementation of JdbcBatchStatementExecutor, that can create
statements for multiple tables.

After this lenghty introduction, my question is basically: do Flink
developers and community welcome further discussion and contributrions
aimed at easing those, and similar pain points regarding more
"dynamic" behavior of Flink? I'm willing to contribute, but don't want
to just throw code over the wall if no one else is interested in using
it.

Thanks,
Maciej
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Enabling more dynamic, or metadata-driven behaviors

Till Rohrmann
Hi Maciej,

The Flink community highly appreciates any kind of feedback and improvement
suggestions. W/o going into more detail for the problems you've
encountered, it is very likely that other people have run into something
similar as well. Hence, it is a good idea to share these limitations and to
discuss possible solutions for them. If you already have a working solution
then this could be a good starting point for the discussion and in the best
case we can take the solution as it is. What I would suggest is to create
for each problem a separate JIRA issue and start a separate discussion if
required.

One thing to note is that the community is working towards the feature
freeze for Flink 1.13. Due to this it can happen that people might respond
a bit later.

Cheers,
Till

On Mon, Mar 1, 2021 at 12:58 PM Maciej Obuchowski <
[hidden email]> wrote:

> While working on project that's strongly metadata-driven I've had to
> overcome several deficiencies, or assumptions in Flink code. Project
> involves reading from hundreds (possibly thousands) kafka topics, each
> containing avro messages with different schemas. Then, various
> transformations and aggregations are applied to that data. Then
> transformed and aggregated data is written to several sinks - JDBC,
> file, etc. On of the goals is making simple changes possible, like
> adding topics, changing transformations or schemas without writing
> code - so all is metadata driven.
>
> Some of the things I've encountered:
>
> It's not possible to read avro messages from kafka without somehow
> providing reader schema from user code. It's simply impractical to
> keep 1000s of schemas (and flink's AvroDeserializationSchemas) around,
> or even worse - Kafka consumers per topic/schema.
> Solution was to use custom deserialization schema similar to this approach
>
> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without
>
> Another one was regarding serialization. This might be possible, but I
> haven't found way to serialize avro's generic data types without Kryo
> fallback, which hurts performance. I've resorted to manually
> serializing record to bytes[] and deserializing it in the next task.
> Also, see this mail thread where Lasse had similar case with Jackson's
> objects.
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Jackson-object-serialisations-td41691.html
>
> Third one was JDBC sink behavior. Currently, it assumes that it will
> be used to insert data to one table, provided statically. This one has
> the worst implication, because sink consumes one database connection,
> which are quite expensive when we're talking about hundreds of them.
> In this case there was no other way than forking Flink and providing
> another implementation of JdbcBatchStatementExecutor, that can create
> statements for multiple tables.
>
> After this lenghty introduction, my question is basically: do Flink
> developers and community welcome further discussion and contributrions
> aimed at easing those, and similar pain points regarding more
> "dynamic" behavior of Flink? I'm willing to contribute, but don't want
> to just throw code over the wall if no one else is interested in using
> it.
>
> Thanks,
> Maciej
>