While working on project that's strongly metadata-driven I've had to
overcome several deficiencies, or assumptions in Flink code. Project involves reading from hundreds (possibly thousands) kafka topics, each containing avro messages with different schemas. Then, various transformations and aggregations are applied to that data. Then transformed and aggregated data is written to several sinks - JDBC, file, etc. On of the goals is making simple changes possible, like adding topics, changing transformations or schemas without writing code - so all is metadata driven. Some of the things I've encountered: It's not possible to read avro messages from kafka without somehow providing reader schema from user code. It's simply impractical to keep 1000s of schemas (and flink's AvroDeserializationSchemas) around, or even worse - Kafka consumers per topic/schema. Solution was to use custom deserialization schema similar to this approach https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without Another one was regarding serialization. This might be possible, but I haven't found way to serialize avro's generic data types without Kryo fallback, which hurts performance. I've resorted to manually serializing record to bytes[] and deserializing it in the next task. Also, see this mail thread where Lasse had similar case with Jackson's objects. http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Jackson-object-serialisations-td41691.html Third one was JDBC sink behavior. Currently, it assumes that it will be used to insert data to one table, provided statically. This one has the worst implication, because sink consumes one database connection, which are quite expensive when we're talking about hundreds of them. In this case there was no other way than forking Flink and providing another implementation of JdbcBatchStatementExecutor, that can create statements for multiple tables. After this lenghty introduction, my question is basically: do Flink developers and community welcome further discussion and contributrions aimed at easing those, and similar pain points regarding more "dynamic" behavior of Flink? I'm willing to contribute, but don't want to just throw code over the wall if no one else is interested in using it. Thanks, Maciej |
Hi Maciej,
The Flink community highly appreciates any kind of feedback and improvement suggestions. W/o going into more detail for the problems you've encountered, it is very likely that other people have run into something similar as well. Hence, it is a good idea to share these limitations and to discuss possible solutions for them. If you already have a working solution then this could be a good starting point for the discussion and in the best case we can take the solution as it is. What I would suggest is to create for each problem a separate JIRA issue and start a separate discussion if required. One thing to note is that the community is working towards the feature freeze for Flink 1.13. Due to this it can happen that people might respond a bit later. Cheers, Till On Mon, Mar 1, 2021 at 12:58 PM Maciej Obuchowski < [hidden email]> wrote: > While working on project that's strongly metadata-driven I've had to > overcome several deficiencies, or assumptions in Flink code. Project > involves reading from hundreds (possibly thousands) kafka topics, each > containing avro messages with different schemas. Then, various > transformations and aggregations are applied to that data. Then > transformed and aggregated data is written to several sinks - JDBC, > file, etc. On of the goals is making simple changes possible, like > adding topics, changing transformations or schemas without writing > code - so all is metadata driven. > > Some of the things I've encountered: > > It's not possible to read avro messages from kafka without somehow > providing reader schema from user code. It's simply impractical to > keep 1000s of schemas (and flink's AvroDeserializationSchemas) around, > or even worse - Kafka consumers per topic/schema. > Solution was to use custom deserialization schema similar to this approach > > https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without > > Another one was regarding serialization. This might be possible, but I > haven't found way to serialize avro's generic data types without Kryo > fallback, which hurts performance. I've resorted to manually > serializing record to bytes[] and deserializing it in the next task. > Also, see this mail thread where Lasse had similar case with Jackson's > objects. > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Jackson-object-serialisations-td41691.html > > Third one was JDBC sink behavior. Currently, it assumes that it will > be used to insert data to one table, provided statically. This one has > the worst implication, because sink consumes one database connection, > which are quite expensive when we're talking about hundreds of them. > In this case there was no other way than forking Flink and providing > another implementation of JdbcBatchStatementExecutor, that can create > statements for multiple tables. > > After this lenghty introduction, my question is basically: do Flink > developers and community welcome further discussion and contributrions > aimed at easing those, and similar pain points regarding more > "dynamic" behavior of Flink? I'm willing to contribute, but don't want > to just throw code over the wall if no one else is interested in using > it. > > Thanks, > Maciej > |
Free forum by Nabble | Edit this page |