Hi guys,
Let's discuss the new FLIP proposal for model serving over Flink. The idea is to combine previous efforts there and provide a library on top of Flink for serving models. https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving Code from previous efforts can be found here: https://github.com/FlinkML Best, Stavros |
Hi Stavros,
thanks for the detailed FLIP! Model serving is an important use case and it's great to see efforts to add a library for this to Flink! I've read the FLIP and would like to ask a few questions and make some suggestions. 1) Is it a strict requirement that a ML pipeline must be able to handle different input types? I understand that it makes sense to have different models for different instances of the same type, i.e., same data type but different keys. Hence, the key-based joins make sense to me. However, couldn't completely different types be handled by different ML pipelines or would there be major drawbacks? 2) I think from an API point of view it would be better to not require input records to be encoded as ProtoBuf messages. Instead, the model server could accept strongly-typed objects (Java/Scala) and (if necessary) convert them to ProtoBuf messages internally. In case we need to support different types of records (see my first point), we can introduce a Union type (i.e., an n-ary Either type). I see that we need some kind of binary encoding format for the models but maybe also this can be designed to be pluggable such that later other encodings can be added. 3) I think the DataStream Java API should be supported as a first class citizens for this library. 4) For the integration with the DataStream API, we could provide an API that receives (typed) DataStream objects, internally constructs the DataStream operators, and returns one (or more) result DataStreams. The benefit is that we don't need to change the DataStream API directly, but put a library on top. The other libraries (CEP, Table, Gelly) follow this approach. 5) I'm skeptical about using queryable state to expose metrics. Did you consider using Flink's metrics system [1]? It is easily configurable and we provided several reporters that export the metrics. What do you think? Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <[hidden email]>: > Hi guys, > > Let's discuss the new FLIP proposal for model serving over Flink. The idea > is to combine previous efforts there and provide a library on top of Flink > for serving models. > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > > Code from previous efforts can be found here: https://github.com/FlinkML > > Best, > Stavros > |
Hi Fabian thanx!
> 1) Is it a strict requirement that a ML pipeline must be able to handle > different input types? > I understand that it makes sense to have different models for different > instances of the same type, i.e., same data type but different keys. Hence, > the key-based joins make sense to me. However, couldn't completely > different types be handled by different ML pipelines or would there be > major drawbacks? Could you elaborate more on this? Right now we only use keys when we do the join. A given pipeline can handle only a well defined type (the type can be a simple string with a custom value, no need to be a class type) which serves as a key. 2) I think from an API point of view it would be better to not require > input records to be encoded as ProtoBuf messages. Instead, the model server > could accept strongly-typed objects (Java/Scala) and (if necessary) convert > them to ProtoBuf messages internally. In case we need to support different > types of records (see my first point), we can introduce a Union type (i.e., > an n-ary Either type). I see that we need some kind of binary encoding > format for the models but maybe also this can be designed to be pluggable > such that later other encodings can be added. > We do uses scala classes (strongly typed classes), protobuf is only used on the wire. For on the wire encoding we prefer protobufs for size, expressiveness and ability to represent different data types. 3) I think the DataStream Java API should be supported as a first class > citizens for this library. I agree. It should be either first priority or a next thing to do. 4) For the integration with the DataStream API, we could provide an API that > receives (typed) DataStream objects, internally constructs the DataStream > operators, and returns one (or more) result DataStreams. The benefit is > that we don't need to change the DataStream API directly, but put a library > on top. The other libraries (CEP, Table, Gelly) follow this approach. We will provide a DSL which will do jsut this. But even without the DSL this is what we do with low level joins. 5) > I'm skeptical about using queryable state to expose metrics. Did you > consider using Flink's metrics system [1]? It is easily configurable and we > provided several reporters that export the metrics. > This of course is an option. The choice of queryable state was mostly driven by a simplicity of real time integration. Any reason why metrics system is netter? Best, Stavros On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> wrote: > Hi Stavros, > > thanks for the detailed FLIP! > Model serving is an important use case and it's great to see efforts to add > a library for this to Flink! > > I've read the FLIP and would like to ask a few questions and make some > suggestions. > > 1) Is it a strict requirement that a ML pipeline must be able to handle > different input types? > I understand that it makes sense to have different models for different > instances of the same type, i.e., same data type but different keys. Hence, > the key-based joins make sense to me. However, couldn't completely > different types be handled by different ML pipelines or would there be > major drawbacks? > > 2) I think from an API point of view it would be better to not require > input records to be encoded as ProtoBuf messages. Instead, the model server > could accept strongly-typed objects (Java/Scala) and (if necessary) convert > them to ProtoBuf messages internally. In case we need to support different > types of records (see my first point), we can introduce a Union type (i.e., > an n-ary Either type). I see that we need some kind of binary encoding > format for the models but maybe also this can be designed to be pluggable > such that later other encodings can be added. > > 3) I think the DataStream Java API should be supported as a first class > citizens for this library. > > 4) For the integration with the DataStream API, we could provide an API > that receives (typed) DataStream objects, internally constructs the > DataStream operators, and returns one (or more) result DataStreams. The > benefit is that we don't need to change the DataStream API directly, but > put a library on top. The other libraries (CEP, Table, Gelly) follow this > approach. > > 5) I'm skeptical about using queryable state to expose metrics. Did you > consider using Flink's metrics system [1]? It is easily configurable and we > provided several reporters that export the metrics. > > What do you think? > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/ > metrics.html > > 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <[hidden email]>: > > > Hi guys, > > > > Let's discuss the new FLIP proposal for model serving over Flink. The > idea > > is to combine previous efforts there and provide a library on top of > Flink > > for serving models. > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 23+-+Model+Serving > > > > Code from previous efforts can be found here: https://github.com/FlinkML > > > > Best, > > Stavros > > > |
Hi Boris and Stavros,
Thanks for the responses. Ad 1) Thanks for the clarification. I think I misunderstood this part of the proposal. I interpreted the argument why to chose ProtoBuf for network encoding ("ability to represent different data types") such that different a model pipeline should work on different data types. I agree that it should be possible to give records of the same type (but with different keys) to different models. The key-based join approach looks good to me. Ad 2) I understand that ProtoBuf is a good choice to serialize models for the given reasons. However, the choice of ProtoBuf serialization for the records might make the integration with existing libraries and also regular DataStream programs more difficult. They all use Flink's TypeSerializer system to serialize and deserialize records by default. Hence, we would need to add a conversion step before records can be passed to a model serving operator. Are you expecting some common format that all records follow (such as a Row or Vector type) or do you plan to support arbitrary records such as Pojos? If you plan for a specific type, you could add a TypeInformation for this type with a TypeSerializer that is based on ProtoBuf. Ad 4) @Boris: I made this point not about the serialization format but how the library would integrate with Flink's DataStream API. I thought I had seen a code snippet that showed a new method on the DataStream object but cannot find this anymore. So, I just wanted to make the point that we should not change the DataStream API (unless it lacks support for some features) and built the model serving library on top of it. But I get from Stavros answer that this is your design anyway. Ad 5) The metrics system is the default way to expose system and job metrics in Flink. Due to the pluggable reporter interface and various reporters, they can be easily integrated in many production environments. A solution based on queryable state will always need custom code to access the information. Of course this can be an optional feature. What do others think about this proposal? Best, Fabian 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <[hidden email]>: > Hi Fabian thanx! > > > > 1) Is it a strict requirement that a ML pipeline must be able to handle > > different input types? > > I understand that it makes sense to have different models for different > > instances of the same type, i.e., same data type but different keys. > Hence, > > the key-based joins make sense to me. However, couldn't completely > > different types be handled by different ML pipelines or would there be > > major drawbacks? > > > Could you elaborate more on this? Right now we only use keys when we do the > join. A given pipeline can handle only a well defined type (the type can be > a simple string with a custom value, no need to be a > class type) which serves as a key. > > 2) > > I think from an API point of view it would be better to not require > > input records to be encoded as ProtoBuf messages. Instead, the model > server > > could accept strongly-typed objects (Java/Scala) and (if necessary) > convert > > them to ProtoBuf messages internally. In case we need to support > different > > types of records (see my first point), we can introduce a Union type > (i.e., > > an n-ary Either type). I see that we need some kind of binary encoding > > format for the models but maybe also this can be designed to be pluggable > > such that later other encodings can be added. > > > We do uses scala classes (strongly typed classes), protobuf is only used > on the wire. For on the wire encoding we prefer protobufs for size, > expressiveness and ability to represent different data types. > > 3) > > I think the DataStream Java API should be supported as a first class > > citizens for this library. > > > I agree. It should be either first priority or a next thing to do. > > > 4) > > For the integration with the DataStream API, we could provide an API that > > receives (typed) DataStream objects, internally constructs the DataStream > > operators, and returns one (or more) result DataStreams. The benefit is > > that we don't need to change the DataStream API directly, but put a > library > > on top. The other libraries (CEP, Table, Gelly) follow this approach. > > > We will provide a DSL which will do jsut this. But even without the DSL > this is what we do with low level joins. > > > 5) > > > I'm skeptical about using queryable state to expose metrics. Did you > > consider using Flink's metrics system [1]? It is easily configurable and > we > > provided several reporters that export the metrics. > > > This of course is an option. The choice of queryable state was mostly > driven by a simplicity of real time integration. Any reason why metrics > system is netter? > > > Best, > Stavros > > On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> wrote: > > > Hi Stavros, > > > > thanks for the detailed FLIP! > > Model serving is an important use case and it's great to see efforts to > add > > a library for this to Flink! > > > > I've read the FLIP and would like to ask a few questions and make some > > suggestions. > > > > 1) Is it a strict requirement that a ML pipeline must be able to handle > > different input types? > > I understand that it makes sense to have different models for different > > instances of the same type, i.e., same data type but different keys. > Hence, > > the key-based joins make sense to me. However, couldn't completely > > different types be handled by different ML pipelines or would there be > > major drawbacks? > > > > 2) I think from an API point of view it would be better to not require > > input records to be encoded as ProtoBuf messages. Instead, the model > server > > could accept strongly-typed objects (Java/Scala) and (if necessary) > convert > > them to ProtoBuf messages internally. In case we need to support > different > > types of records (see my first point), we can introduce a Union type > (i.e., > > an n-ary Either type). I see that we need some kind of binary encoding > > format for the models but maybe also this can be designed to be pluggable > > such that later other encodings can be added. > > > > 3) I think the DataStream Java API should be supported as a first class > > citizens for this library. > > > > 4) For the integration with the DataStream API, we could provide an API > > that receives (typed) DataStream objects, internally constructs the > > DataStream operators, and returns one (or more) result DataStreams. The > > benefit is that we don't need to change the DataStream API directly, but > > put a library on top. The other libraries (CEP, Table, Gelly) follow this > > approach. > > > > 5) I'm skeptical about using queryable state to expose metrics. Did you > > consider using Flink's metrics system [1]? It is easily configurable and > we > > provided several reporters that export the metrics. > > > > What do you think? > > Best, Fabian > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/ > > metrics.html > > > > 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos <[hidden email] > >: > > > > > Hi guys, > > > > > > Let's discuss the new FLIP proposal for model serving over Flink. The > > idea > > > is to combine previous efforts there and provide a library on top of > > Flink > > > for serving models. > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > 23+-+Model+Serving > > > > > > Code from previous efforts can be found here: > https://github.com/FlinkML > > > > > > Best, > > > Stavros > > > > > > |
Hi,
Sorry for the late follow up. I think I understand the motivation for choosing ProtoBuf as the representation and serialization format and this makes sense to me. However, it might be a good idea to provide tooling to convert Flink types (described as TypeInformation) to ProtoBuf. Otherwise, users of the model serving library would need to manually convert their data types (say Scala tuples, case classes, or Avro Pojos) to ProtoBuf messages. I don't think that this needs to be included in the first version but it might be a good extension to make the library easier to use. Best, Fabian 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <[hidden email]>: > Thanks Fabian, > More below > > > > Boris Lublinsky > FDP Architect > [hidden email] > https://www.lightbend.com/ > > On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> wrote: > > Hi Boris and Stavros, > > Thanks for the responses. > > Ad 1) Thanks for the clarification. I think I misunderstood this part of > the proposal. > I interpreted the argument why to chose ProtoBuf for network encoding ("ability > to represent different data types") such that different a model pipeline > should work on different data types. > I agree that it should be possible to give records of the same type (but > with different keys) to different models. The key-based join approach looks > good to me. > > Ad 2) I understand that ProtoBuf is a good choice to serialize models for > the given reasons. > However, the choice of ProtoBuf serialization for the records might make > the integration with existing libraries and also regular DataStream > programs more difficult. > They all use Flink's TypeSerializer system to serialize and deserialize > records by default. Hence, we would need to add a conversion step before > records can be passed to a model serving operator. > Are you expecting some common format that all records follow (such as a > Row or Vector type) or do you plan to support arbitrary records such as > Pojos? > If you plan for a specific type, you could add a TypeInformation for this > type with a TypeSerializer that is based on ProtoBuf. > > The way I look at it is slightly different. The common format for records, > supported by Flink, is Byte array with a little bit of header, describing > data type and is used for routing. The actual unmarshalling is done by the > model implementation itself. This provides the maximum flexibility and > gives user the freedom to create his own types without breaking underlying > framework. > > Ad 4) @Boris: I made this point not about the serialization format but how > the library would integrate with Flink's DataStream API. > I thought I had seen a code snippet that showed a new method on the > DataStream object but cannot find this anymore. > So, I just wanted to make the point that we should not change the > DataStream API (unless it lacks support for some features) and built the > model serving library on top of it. > But I get from Stavros answer that this is your design anyway. > > Ad 5) The metrics system is the default way to expose system and job > metrics in Flink. Due to the pluggable reporter interface and various > reporters, they can be easily integrated in many production environments. > A solution based on queryable state will always need custom code to access > the information. Of course this can be an optional feature. > > What do others think about this proposal? > > We had agreement among work group - Eron, Bas, Andrea, etc, but you are > the first one outside of it. My book https://www.lightbend. > com/blog/serving-machine-learning-models-free-oreilly-ebook-from-lightbend has > a reasonably good reviews, so we are hoping this will work > > > Best, Fabian > > > 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <[hidden email]>: > >> Hi Fabian thanx! >> >> >> > 1) Is it a strict requirement that a ML pipeline must be able to handle >> > different input types? >> > I understand that it makes sense to have different models for different >> > instances of the same type, i.e., same data type but different keys. >> Hence, >> > the key-based joins make sense to me. However, couldn't completely >> > different types be handled by different ML pipelines or would there be >> > major drawbacks? >> >> >> Could you elaborate more on this? Right now we only use keys when we do >> the >> join. A given pipeline can handle only a well defined type (the type can >> be >> a simple string with a custom value, no need to be a >> class type) which serves as a key. >> >> 2) >> >> I think from an API point of view it would be better to not require >> > input records to be encoded as ProtoBuf messages. Instead, the model >> server >> > could accept strongly-typed objects (Java/Scala) and (if necessary) >> convert >> > them to ProtoBuf messages internally. In case we need to support >> different >> > types of records (see my first point), we can introduce a Union type >> (i.e., >> > an n-ary Either type). I see that we need some kind of binary encoding >> > format for the models but maybe also this can be designed to be >> pluggable >> > such that later other encodings can be added. >> > >> We do uses scala classes (strongly typed classes), protobuf is only used >> on the wire. For on the wire encoding we prefer protobufs for size, >> expressiveness and ability to represent different data types. >> >> 3) >> >> I think the DataStream Java API should be supported as a first class >> > citizens for this library. >> >> >> I agree. It should be either first priority or a next thing to do. >> >> >> 4) >> >> For the integration with the DataStream API, we could provide an API that >> > receives (typed) DataStream objects, internally constructs the >> DataStream >> > operators, and returns one (or more) result DataStreams. The benefit is >> > that we don't need to change the DataStream API directly, but put a >> library >> > on top. The other libraries (CEP, Table, Gelly) follow this approach. >> >> >> We will provide a DSL which will do jsut this. But even without the DSL >> this is what we do with low level joins. >> >> >> 5) >> >> > I'm skeptical about using queryable state to expose metrics. Did you >> > consider using Flink's metrics system [1]? It is easily configurable >> and we >> > provided several reporters that export the metrics. >> > >> This of course is an option. The choice of queryable state was mostly >> driven by a simplicity of real time integration. Any reason why metrics >> system is netter? >> >> >> Best, >> Stavros >> >> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> wrote: >> >> > Hi Stavros, >> > >> > thanks for the detailed FLIP! >> > Model serving is an important use case and it's great to see efforts to >> add >> > a library for this to Flink! >> > >> > I've read the FLIP and would like to ask a few questions and make some >> > suggestions. >> > >> > 1) Is it a strict requirement that a ML pipeline must be able to handle >> > different input types? >> > I understand that it makes sense to have different models for different >> > instances of the same type, i.e., same data type but different keys. >> Hence, >> > the key-based joins make sense to me. However, couldn't completely >> > different types be handled by different ML pipelines or would there be >> > major drawbacks? >> > >> > 2) I think from an API point of view it would be better to not require >> > input records to be encoded as ProtoBuf messages. Instead, the model >> server >> > could accept strongly-typed objects (Java/Scala) and (if necessary) >> convert >> > them to ProtoBuf messages internally. In case we need to support >> different >> > types of records (see my first point), we can introduce a Union type >> (i.e., >> > an n-ary Either type). I see that we need some kind of binary encoding >> > format for the models but maybe also this can be designed to be >> pluggable >> > such that later other encodings can be added. >> > >> > 3) I think the DataStream Java API should be supported as a first class >> > citizens for this library. >> > >> > 4) For the integration with the DataStream API, we could provide an API >> > that receives (typed) DataStream objects, internally constructs the >> > DataStream operators, and returns one (or more) result DataStreams. The >> > benefit is that we don't need to change the DataStream API directly, but >> > put a library on top. The other libraries (CEP, Table, Gelly) follow >> this >> > approach. >> > >> > 5) I'm skeptical about using queryable state to expose metrics. Did you >> > consider using Flink's metrics system [1]? It is easily configurable >> and we >> > provided several reporters that export the metrics. >> > >> > What do you think? >> > Best, Fabian >> > >> > [1] >> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/ >> > metrics.html >> > >> > 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < >> [hidden email]>: >> > >> > > Hi guys, >> > > >> > > Let's discuss the new FLIP proposal for model serving over Flink. The >> > idea >> > > is to combine previous efforts there and provide a library on top of >> > Flink >> > > for serving models. >> > > >> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- >> > 23+-+Model+Serving >> > > >> > > Code from previous efforts can be found here: >> https://github.com/FlinkML >> > > >> > > Best, >> > > Stavros >> > > >> > >> > > > |
Are there any more comments on the FLIP?
Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and continue with the implementation. Also, is there a committer who'd like to shepherd the FLIP and review the corresponding PRs? Of course, everybody is welcome to review the code but we need at least one committer who will eventually merge the changes. Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > Hi, > > Sorry for the late follow up. > > I think I understand the motivation for choosing ProtoBuf as the > representation and serialization format and this makes sense to me. > > However, it might be a good idea to provide tooling to convert Flink types > (described as TypeInformation) to ProtoBuf. > Otherwise, users of the model serving library would need to manually > convert their data types (say Scala tuples, case classes, or Avro Pojos) to > ProtoBuf messages. > I don't think that this needs to be included in the first version but it > might be a good extension to make the library easier to use. > > Best, > Fabian > > > > 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <[hidden email]> > : > >> Thanks Fabian, >> More below >> >> >> >> Boris Lublinsky >> FDP Architect >> [hidden email] >> https://www.lightbend.com/ >> >> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> wrote: >> >> Hi Boris and Stavros, >> >> Thanks for the responses. >> >> Ad 1) Thanks for the clarification. I think I misunderstood this part of >> the proposal. >> I interpreted the argument why to chose ProtoBuf for network encoding ("ability >> to represent different data types") such that different a model pipeline >> should work on different data types. >> I agree that it should be possible to give records of the same type (but >> with different keys) to different models. The key-based join approach looks >> good to me. >> >> Ad 2) I understand that ProtoBuf is a good choice to serialize models for >> the given reasons. >> However, the choice of ProtoBuf serialization for the records might make >> the integration with existing libraries and also regular DataStream >> programs more difficult. >> They all use Flink's TypeSerializer system to serialize and deserialize >> records by default. Hence, we would need to add a conversion step before >> records can be passed to a model serving operator. >> Are you expecting some common format that all records follow (such as a >> Row or Vector type) or do you plan to support arbitrary records such as >> Pojos? >> If you plan for a specific type, you could add a TypeInformation for this >> type with a TypeSerializer that is based on ProtoBuf. >> >> The way I look at it is slightly different. The common format for >> records, supported by Flink, is Byte array with a little bit of header, >> describing data type and is used for routing. The actual unmarshalling is >> done by the model implementation itself. This provides the maximum >> flexibility and gives user the freedom to create his own types without >> breaking underlying framework. >> >> Ad 4) @Boris: I made this point not about the serialization format but >> how the library would integrate with Flink's DataStream API. >> I thought I had seen a code snippet that showed a new method on the >> DataStream object but cannot find this anymore. >> So, I just wanted to make the point that we should not change the >> DataStream API (unless it lacks support for some features) and built the >> model serving library on top of it. >> But I get from Stavros answer that this is your design anyway. >> >> Ad 5) The metrics system is the default way to expose system and job >> metrics in Flink. Due to the pluggable reporter interface and various >> reporters, they can be easily integrated in many production environments. >> A solution based on queryable state will always need custom code to >> access the information. Of course this can be an optional feature. >> >> What do others think about this proposal? >> >> We had agreement among work group - Eron, Bas, Andrea, etc, but you are >> the first one outside of it. My book https://www.lightbend.com >> /blog/serving-machine-learning-models-free-oreilly-ebook-from-lightbend has >> a reasonably good reviews, so we are hoping this will work >> >> >> Best, Fabian >> >> >> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <[hidden email]> >> : >> >>> Hi Fabian thanx! >>> >>> >>> > 1) Is it a strict requirement that a ML pipeline must be able to handle >>> > different input types? >>> > I understand that it makes sense to have different models for different >>> > instances of the same type, i.e., same data type but different keys. >>> Hence, >>> > the key-based joins make sense to me. However, couldn't completely >>> > different types be handled by different ML pipelines or would there be >>> > major drawbacks? >>> >>> >>> Could you elaborate more on this? Right now we only use keys when we do >>> the >>> join. A given pipeline can handle only a well defined type (the type can >>> be >>> a simple string with a custom value, no need to be a >>> class type) which serves as a key. >>> >>> 2) >>> >>> I think from an API point of view it would be better to not require >>> > input records to be encoded as ProtoBuf messages. Instead, the model >>> server >>> > could accept strongly-typed objects (Java/Scala) and (if necessary) >>> convert >>> > them to ProtoBuf messages internally. In case we need to support >>> different >>> > types of records (see my first point), we can introduce a Union type >>> (i.e., >>> > an n-ary Either type). I see that we need some kind of binary encoding >>> > format for the models but maybe also this can be designed to be >>> pluggable >>> > such that later other encodings can be added. >>> > >>> We do uses scala classes (strongly typed classes), protobuf is only used >>> on the wire. For on the wire encoding we prefer protobufs for size, >>> expressiveness and ability to represent different data types. >>> >>> 3) >>> >>> I think the DataStream Java API should be supported as a first class >>> > citizens for this library. >>> >>> >>> I agree. It should be either first priority or a next thing to do. >>> >>> >>> 4) >>> >>> For the integration with the DataStream API, we could provide an API that >>> > receives (typed) DataStream objects, internally constructs the >>> DataStream >>> > operators, and returns one (or more) result DataStreams. The benefit is >>> > that we don't need to change the DataStream API directly, but put a >>> library >>> > on top. The other libraries (CEP, Table, Gelly) follow this approach. >>> >>> >>> We will provide a DSL which will do jsut this. But even without the DSL >>> this is what we do with low level joins. >>> >>> >>> 5) >>> >>> > I'm skeptical about using queryable state to expose metrics. Did you >>> > consider using Flink's metrics system [1]? It is easily configurable >>> and we >>> > provided several reporters that export the metrics. >>> > >>> This of course is an option. The choice of queryable state was mostly >>> driven by a simplicity of real time integration. Any reason why metrics >>> system is netter? >>> >>> >>> Best, >>> Stavros >>> >>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> >>> wrote: >>> >>> > Hi Stavros, >>> > >>> > thanks for the detailed FLIP! >>> > Model serving is an important use case and it's great to see efforts >>> to add >>> > a library for this to Flink! >>> > >>> > I've read the FLIP and would like to ask a few questions and make some >>> > suggestions. >>> > >>> > 1) Is it a strict requirement that a ML pipeline must be able to handle >>> > different input types? >>> > I understand that it makes sense to have different models for different >>> > instances of the same type, i.e., same data type but different keys. >>> Hence, >>> > the key-based joins make sense to me. However, couldn't completely >>> > different types be handled by different ML pipelines or would there be >>> > major drawbacks? >>> > >>> > 2) I think from an API point of view it would be better to not require >>> > input records to be encoded as ProtoBuf messages. Instead, the model >>> server >>> > could accept strongly-typed objects (Java/Scala) and (if necessary) >>> convert >>> > them to ProtoBuf messages internally. In case we need to support >>> different >>> > types of records (see my first point), we can introduce a Union type >>> (i.e., >>> > an n-ary Either type). I see that we need some kind of binary encoding >>> > format for the models but maybe also this can be designed to be >>> pluggable >>> > such that later other encodings can be added. >>> > >>> > 3) I think the DataStream Java API should be supported as a first class >>> > citizens for this library. >>> > >>> > 4) For the integration with the DataStream API, we could provide an API >>> > that receives (typed) DataStream objects, internally constructs the >>> > DataStream operators, and returns one (or more) result DataStreams. The >>> > benefit is that we don't need to change the DataStream API directly, >>> but >>> > put a library on top. The other libraries (CEP, Table, Gelly) follow >>> this >>> > approach. >>> > >>> > 5) I'm skeptical about using queryable state to expose metrics. Did you >>> > consider using Flink's metrics system [1]? It is easily configurable >>> and we >>> > provided several reporters that export the metrics. >>> > >>> > What do you think? >>> > Best, Fabian >>> > >>> > [1] >>> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> monitoring/ >>> > metrics.html >>> > >>> > 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < >>> [hidden email]>: >>> > >>> > > Hi guys, >>> > > >>> > > Let's discuss the new FLIP proposal for model serving over Flink. The >>> > idea >>> > > is to combine previous efforts there and provide a library on top of >>> > Flink >>> > > for serving models. >>> > > >>> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- >>> > 23+-+Model+Serving >>> > > >>> > > Code from previous efforts can be found here: >>> https://github.com/FlinkML >>> > > >>> > > Best, >>> > > Stavros >>> > > >>> > >>> >> >> >> > |
I'm currently looking over it, but one thing that stood out was that the
FLIP proposes to use queryable state as a monitoring solution. Given that we have a metric system that integrates with plenty of commonly used metric backends this doesn't really make sense to me. Storing them in state still has value in terms of fault-tolerance though, since this is something that the metric system doesn't provide by itself. On 18.01.2018 13:57, Fabian Hueske wrote: > Are there any more comments on the FLIP? > > Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and > continue with the implementation. > > Also, is there a committer who'd like to shepherd the FLIP and review the > corresponding PRs? > Of course, everybody is welcome to review the code but we need at least one > committer who will eventually merge the changes. > > Best, > Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals > > 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > >> Hi, >> >> Sorry for the late follow up. >> >> I think I understand the motivation for choosing ProtoBuf as the >> representation and serialization format and this makes sense to me. >> >> However, it might be a good idea to provide tooling to convert Flink types >> (described as TypeInformation) to ProtoBuf. >> Otherwise, users of the model serving library would need to manually >> convert their data types (say Scala tuples, case classes, or Avro Pojos) to >> ProtoBuf messages. >> I don't think that this needs to be included in the first version but it >> might be a good extension to make the library easier to use. >> >> Best, >> Fabian >> >> >> >> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky <[hidden email]> >> : >> >>> Thanks Fabian, >>> More below >>> >>> >>> >>> Boris Lublinsky >>> FDP Architect >>> [hidden email] >>> https://www.lightbend.com/ >>> >>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> wrote: >>> >>> Hi Boris and Stavros, >>> >>> Thanks for the responses. >>> >>> Ad 1) Thanks for the clarification. I think I misunderstood this part of >>> the proposal. >>> I interpreted the argument why to chose ProtoBuf for network encoding ("ability >>> to represent different data types") such that different a model pipeline >>> should work on different data types. >>> I agree that it should be possible to give records of the same type (but >>> with different keys) to different models. The key-based join approach looks >>> good to me. >>> >>> Ad 2) I understand that ProtoBuf is a good choice to serialize models for >>> the given reasons. >>> However, the choice of ProtoBuf serialization for the records might make >>> the integration with existing libraries and also regular DataStream >>> programs more difficult. >>> They all use Flink's TypeSerializer system to serialize and deserialize >>> records by default. Hence, we would need to add a conversion step before >>> records can be passed to a model serving operator. >>> Are you expecting some common format that all records follow (such as a >>> Row or Vector type) or do you plan to support arbitrary records such as >>> Pojos? >>> If you plan for a specific type, you could add a TypeInformation for this >>> type with a TypeSerializer that is based on ProtoBuf. >>> >>> The way I look at it is slightly different. The common format for >>> records, supported by Flink, is Byte array with a little bit of header, >>> describing data type and is used for routing. The actual unmarshalling is >>> done by the model implementation itself. This provides the maximum >>> flexibility and gives user the freedom to create his own types without >>> breaking underlying framework. >>> >>> Ad 4) @Boris: I made this point not about the serialization format but >>> how the library would integrate with Flink's DataStream API. >>> I thought I had seen a code snippet that showed a new method on the >>> DataStream object but cannot find this anymore. >>> So, I just wanted to make the point that we should not change the >>> DataStream API (unless it lacks support for some features) and built the >>> model serving library on top of it. >>> But I get from Stavros answer that this is your design anyway. >>> >>> Ad 5) The metrics system is the default way to expose system and job >>> metrics in Flink. Due to the pluggable reporter interface and various >>> reporters, they can be easily integrated in many production environments. >>> A solution based on queryable state will always need custom code to >>> access the information. Of course this can be an optional feature. >>> >>> What do others think about this proposal? >>> >>> We had agreement among work group - Eron, Bas, Andrea, etc, but you are >>> the first one outside of it. My book https://www.lightbend.com >>> /blog/serving-machine-learning-models-free-oreilly-ebook-from-lightbend has >>> a reasonably good reviews, so we are hoping this will work >>> >>> >>> Best, Fabian >>> >>> >>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos <[hidden email]> >>> : >>> >>>> Hi Fabian thanx! >>>> >>>> >>>>> 1) Is it a strict requirement that a ML pipeline must be able to handle >>>>> different input types? >>>>> I understand that it makes sense to have different models for different >>>>> instances of the same type, i.e., same data type but different keys. >>>> Hence, >>>>> the key-based joins make sense to me. However, couldn't completely >>>>> different types be handled by different ML pipelines or would there be >>>>> major drawbacks? >>>> >>>> Could you elaborate more on this? Right now we only use keys when we do >>>> the >>>> join. A given pipeline can handle only a well defined type (the type can >>>> be >>>> a simple string with a custom value, no need to be a >>>> class type) which serves as a key. >>>> >>>> 2) >>>> >>>> I think from an API point of view it would be better to not require >>>>> input records to be encoded as ProtoBuf messages. Instead, the model >>>> server >>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) >>>> convert >>>>> them to ProtoBuf messages internally. In case we need to support >>>> different >>>>> types of records (see my first point), we can introduce a Union type >>>> (i.e., >>>>> an n-ary Either type). I see that we need some kind of binary encoding >>>>> format for the models but maybe also this can be designed to be >>>> pluggable >>>>> such that later other encodings can be added. >>>>> >>>> We do uses scala classes (strongly typed classes), protobuf is only used >>>> on the wire. For on the wire encoding we prefer protobufs for size, >>>> expressiveness and ability to represent different data types. >>>> >>>> 3) >>>> >>>> I think the DataStream Java API should be supported as a first class >>>>> citizens for this library. >>>> >>>> I agree. It should be either first priority or a next thing to do. >>>> >>>> >>>> 4) >>>> >>>> For the integration with the DataStream API, we could provide an API that >>>>> receives (typed) DataStream objects, internally constructs the >>>> DataStream >>>>> operators, and returns one (or more) result DataStreams. The benefit is >>>>> that we don't need to change the DataStream API directly, but put a >>>> library >>>>> on top. The other libraries (CEP, Table, Gelly) follow this approach. >>>> >>>> We will provide a DSL which will do jsut this. But even without the DSL >>>> this is what we do with low level joins. >>>> >>>> >>>> 5) >>>> >>>>> I'm skeptical about using queryable state to expose metrics. Did you >>>>> consider using Flink's metrics system [1]? It is easily configurable >>>> and we >>>>> provided several reporters that export the metrics. >>>>> >>>> This of course is an option. The choice of queryable state was mostly >>>> driven by a simplicity of real time integration. Any reason why metrics >>>> system is netter? >>>> >>>> >>>> Best, >>>> Stavros >>>> >>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> >>>> wrote: >>>> >>>>> Hi Stavros, >>>>> >>>>> thanks for the detailed FLIP! >>>>> Model serving is an important use case and it's great to see efforts >>>> to add >>>>> a library for this to Flink! >>>>> >>>>> I've read the FLIP and would like to ask a few questions and make some >>>>> suggestions. >>>>> >>>>> 1) Is it a strict requirement that a ML pipeline must be able to handle >>>>> different input types? >>>>> I understand that it makes sense to have different models for different >>>>> instances of the same type, i.e., same data type but different keys. >>>> Hence, >>>>> the key-based joins make sense to me. However, couldn't completely >>>>> different types be handled by different ML pipelines or would there be >>>>> major drawbacks? >>>>> >>>>> 2) I think from an API point of view it would be better to not require >>>>> input records to be encoded as ProtoBuf messages. Instead, the model >>>> server >>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) >>>> convert >>>>> them to ProtoBuf messages internally. In case we need to support >>>> different >>>>> types of records (see my first point), we can introduce a Union type >>>> (i.e., >>>>> an n-ary Either type). I see that we need some kind of binary encoding >>>>> format for the models but maybe also this can be designed to be >>>> pluggable >>>>> such that later other encodings can be added. >>>>> >>>>> 3) I think the DataStream Java API should be supported as a first class >>>>> citizens for this library. >>>>> >>>>> 4) For the integration with the DataStream API, we could provide an API >>>>> that receives (typed) DataStream objects, internally constructs the >>>>> DataStream operators, and returns one (or more) result DataStreams. The >>>>> benefit is that we don't need to change the DataStream API directly, >>>> but >>>>> put a library on top. The other libraries (CEP, Table, Gelly) follow >>>> this >>>>> approach. >>>>> >>>>> 5) I'm skeptical about using queryable state to expose metrics. Did you >>>>> consider using Flink's metrics system [1]? It is easily configurable >>>> and we >>>>> provided several reporters that export the metrics. >>>>> >>>>> What do you think? >>>>> Best, Fabian >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>> monitoring/ >>>>> metrics.html >>>>> >>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < >>>> [hidden email]>: >>>>>> Hi guys, >>>>>> >>>>>> Let's discuss the new FLIP proposal for model serving over Flink. The >>>>> idea >>>>>> is to combine previous efforts there and provide a library on top of >>>>> Flink >>>>>> for serving models. >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- >>>>> 23+-+Model+Serving >>>>>> Code from previous efforts can be found here: >>>> https://github.com/FlinkML >>>>>> Best, >>>>>> Stavros >>>>>> >>> >>> |
OK, I think there was plenty of time to comment on this FLIP.
I'll move it to the ACCEPTED status. @Stavros, please consider the feedback regarding the metrics. I agree with Chesnay that metrics should be primarily exposed via the metrics system. Storing them in state makes them fault-tolerant and queryable if the state is properly configured. Thanks, Fabian 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > I'm currently looking over it, but one thing that stood out was that the > FLIP proposes to use queryable state > as a monitoring solution. Given that we have a metric system that > integrates with plenty of commonly used > metric backends this doesn't really make sense to me. > > Storing them in state still has value in terms of fault-tolerance though, > since this is something that the metric > system doesn't provide by itself. > > > On 18.01.2018 13:57, Fabian Hueske wrote: > >> Are there any more comments on the FLIP? >> >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and >> continue with the implementation. >> >> Also, is there a committer who'd like to shepherd the FLIP and review the >> corresponding PRs? >> Of course, everybody is welcome to review the code but we need at least >> one >> committer who will eventually merge the changes. >> >> Best, >> Fabian >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ >> Improvement+Proposals >> >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: >> >> Hi, >>> >>> Sorry for the late follow up. >>> >>> I think I understand the motivation for choosing ProtoBuf as the >>> representation and serialization format and this makes sense to me. >>> >>> However, it might be a good idea to provide tooling to convert Flink >>> types >>> (described as TypeInformation) to ProtoBuf. >>> Otherwise, users of the model serving library would need to manually >>> convert their data types (say Scala tuples, case classes, or Avro Pojos) >>> to >>> ProtoBuf messages. >>> I don't think that this needs to be included in the first version but it >>> might be a good extension to make the library easier to use. >>> >>> Best, >>> Fabian >>> >>> >>> >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < >>> [hidden email]> >>> : >>> >>> Thanks Fabian, >>>> More below >>>> >>>> >>>> >>>> Boris Lublinsky >>>> FDP Architect >>>> [hidden email] >>>> https://www.lightbend.com/ >>>> >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> wrote: >>>> >>>> Hi Boris and Stavros, >>>> >>>> Thanks for the responses. >>>> >>>> Ad 1) Thanks for the clarification. I think I misunderstood this part of >>>> the proposal. >>>> I interpreted the argument why to chose ProtoBuf for network encoding >>>> ("ability >>>> to represent different data types") such that different a model pipeline >>>> should work on different data types. >>>> I agree that it should be possible to give records of the same type (but >>>> with different keys) to different models. The key-based join approach >>>> looks >>>> good to me. >>>> >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize models >>>> for >>>> the given reasons. >>>> However, the choice of ProtoBuf serialization for the records might make >>>> the integration with existing libraries and also regular DataStream >>>> programs more difficult. >>>> They all use Flink's TypeSerializer system to serialize and deserialize >>>> records by default. Hence, we would need to add a conversion step before >>>> records can be passed to a model serving operator. >>>> Are you expecting some common format that all records follow (such as a >>>> Row or Vector type) or do you plan to support arbitrary records such as >>>> Pojos? >>>> If you plan for a specific type, you could add a TypeInformation for >>>> this >>>> type with a TypeSerializer that is based on ProtoBuf. >>>> >>>> The way I look at it is slightly different. The common format for >>>> records, supported by Flink, is Byte array with a little bit of header, >>>> describing data type and is used for routing. The actual unmarshalling >>>> is >>>> done by the model implementation itself. This provides the maximum >>>> flexibility and gives user the freedom to create his own types without >>>> breaking underlying framework. >>>> >>>> Ad 4) @Boris: I made this point not about the serialization format but >>>> how the library would integrate with Flink's DataStream API. >>>> I thought I had seen a code snippet that showed a new method on the >>>> DataStream object but cannot find this anymore. >>>> So, I just wanted to make the point that we should not change the >>>> DataStream API (unless it lacks support for some features) and built the >>>> model serving library on top of it. >>>> But I get from Stavros answer that this is your design anyway. >>>> >>>> Ad 5) The metrics system is the default way to expose system and job >>>> metrics in Flink. Due to the pluggable reporter interface and various >>>> reporters, they can be easily integrated in many production >>>> environments. >>>> A solution based on queryable state will always need custom code to >>>> access the information. Of course this can be an optional feature. >>>> >>>> What do others think about this proposal? >>>> >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but you are >>>> the first one outside of it. My book https://www.lightbend.com >>>> /blog/serving-machine-learning-models-free-oreilly-ebook-from-lightbend >>>> has >>>> >>>> a reasonably good reviews, so we are hoping this will work >>>> >>>> >>>> Best, Fabian >>>> >>>> >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < >>>> [hidden email]> >>>> : >>>> >>>> Hi Fabian thanx! >>>>> >>>>> >>>>> 1) Is it a strict requirement that a ML pipeline must be able to handle >>>>>> different input types? >>>>>> I understand that it makes sense to have different models for >>>>>> different >>>>>> instances of the same type, i.e., same data type but different keys. >>>>>> >>>>> Hence, >>>>> >>>>>> the key-based joins make sense to me. However, couldn't completely >>>>>> different types be handled by different ML pipelines or would there be >>>>>> major drawbacks? >>>>>> >>>>> >>>>> Could you elaborate more on this? Right now we only use keys when we do >>>>> the >>>>> join. A given pipeline can handle only a well defined type (the type >>>>> can >>>>> be >>>>> a simple string with a custom value, no need to be a >>>>> class type) which serves as a key. >>>>> >>>>> 2) >>>>> >>>>> I think from an API point of view it would be better to not require >>>>> >>>>>> input records to be encoded as ProtoBuf messages. Instead, the model >>>>>> >>>>> server >>>>> >>>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) >>>>>> >>>>> convert >>>>> >>>>>> them to ProtoBuf messages internally. In case we need to support >>>>>> >>>>> different >>>>> >>>>>> types of records (see my first point), we can introduce a Union type >>>>>> >>>>> (i.e., >>>>> >>>>>> an n-ary Either type). I see that we need some kind of binary encoding >>>>>> format for the models but maybe also this can be designed to be >>>>>> >>>>> pluggable >>>>> >>>>>> such that later other encodings can be added. >>>>>> >>>>>> We do uses scala classes (strongly typed classes), protobuf is only >>>>> used >>>>> on the wire. For on the wire encoding we prefer protobufs for size, >>>>> expressiveness and ability to represent different data types. >>>>> >>>>> 3) >>>>> >>>>> I think the DataStream Java API should be supported as a first class >>>>> >>>>>> citizens for this library. >>>>>> >>>>> >>>>> I agree. It should be either first priority or a next thing to do. >>>>> >>>>> >>>>> 4) >>>>> >>>>> For the integration with the DataStream API, we could provide an API >>>>> that >>>>> >>>>>> receives (typed) DataStream objects, internally constructs the >>>>>> >>>>> DataStream >>>>> >>>>>> operators, and returns one (or more) result DataStreams. The benefit >>>>>> is >>>>>> that we don't need to change the DataStream API directly, but put a >>>>>> >>>>> library >>>>> >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this approach. >>>>>> >>>>> >>>>> We will provide a DSL which will do jsut this. But even without the >>>>> DSL >>>>> this is what we do with low level joins. >>>>> >>>>> >>>>> 5) >>>>> >>>>> I'm skeptical about using queryable state to expose metrics. Did you >>>>>> consider using Flink's metrics system [1]? It is easily configurable >>>>>> >>>>> and we >>>>> >>>>>> provided several reporters that export the metrics. >>>>>> >>>>>> This of course is an option. The choice of queryable state was mostly >>>>> driven by a simplicity of real time integration. Any reason why >>>>> metrics >>>>> system is netter? >>>>> >>>>> >>>>> Best, >>>>> Stavros >>>>> >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> >>>>> wrote: >>>>> >>>>> Hi Stavros, >>>>>> >>>>>> thanks for the detailed FLIP! >>>>>> Model serving is an important use case and it's great to see efforts >>>>>> >>>>> to add >>>>> >>>>>> a library for this to Flink! >>>>>> >>>>>> I've read the FLIP and would like to ask a few questions and make some >>>>>> suggestions. >>>>>> >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to >>>>>> handle >>>>>> different input types? >>>>>> I understand that it makes sense to have different models for >>>>>> different >>>>>> instances of the same type, i.e., same data type but different keys. >>>>>> >>>>> Hence, >>>>> >>>>>> the key-based joins make sense to me. However, couldn't completely >>>>>> different types be handled by different ML pipelines or would there be >>>>>> major drawbacks? >>>>>> >>>>>> 2) I think from an API point of view it would be better to not require >>>>>> input records to be encoded as ProtoBuf messages. Instead, the model >>>>>> >>>>> server >>>>> >>>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) >>>>>> >>>>> convert >>>>> >>>>>> them to ProtoBuf messages internally. In case we need to support >>>>>> >>>>> different >>>>> >>>>>> types of records (see my first point), we can introduce a Union type >>>>>> >>>>> (i.e., >>>>> >>>>>> an n-ary Either type). I see that we need some kind of binary encoding >>>>>> format for the models but maybe also this can be designed to be >>>>>> >>>>> pluggable >>>>> >>>>>> such that later other encodings can be added. >>>>>> >>>>>> 3) I think the DataStream Java API should be supported as a first >>>>>> class >>>>>> citizens for this library. >>>>>> >>>>>> 4) For the integration with the DataStream API, we could provide an >>>>>> API >>>>>> that receives (typed) DataStream objects, internally constructs the >>>>>> DataStream operators, and returns one (or more) result DataStreams. >>>>>> The >>>>>> benefit is that we don't need to change the DataStream API directly, >>>>>> >>>>> but >>>>> >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) follow >>>>>> >>>>> this >>>>> >>>>>> approach. >>>>>> >>>>>> 5) I'm skeptical about using queryable state to expose metrics. Did >>>>>> you >>>>>> consider using Flink's metrics system [1]? It is easily configurable >>>>>> >>>>> and we >>>>> >>>>>> provided several reporters that export the metrics. >>>>>> >>>>>> What do you think? >>>>>> Best, Fabian >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>>>>> >>>>> monitoring/ >>>>> >>>>>> metrics.html >>>>>> >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < >>>>>> >>>>> [hidden email]>: >>>>> >>>>>> Hi guys, >>>>>>> >>>>>>> Let's discuss the new FLIP proposal for model serving over Flink. The >>>>>>> >>>>>> idea >>>>>> >>>>>>> is to combine previous efforts there and provide a library on top of >>>>>>> >>>>>> Flink >>>>>> >>>>>>> for serving models. >>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- >>>>>>> >>>>>> 23+-+Model+Serving >>>>>> >>>>>>> Code from previous efforts can be found here: >>>>>>> >>>>>> https://github.com/FlinkML >>>>> >>>>>> Best, >>>>>>> Stavros >>>>>>> >>>>>>> >>>> >>>> > |
Thanx @Fabian. I will update the document accordingly wrt metrics.
I agree there are pros and cons. Best, Stavros On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email]> wrote: > OK, I think there was plenty of time to comment on this FLIP. > I'll move it to the ACCEPTED status. > > @Stavros, please consider the feedback regarding the metrics. > I agree with Chesnay that metrics should be primarily exposed via the > metrics system. > Storing them in state makes them fault-tolerant and queryable if the state > is properly configured. > > Thanks, > Fabian > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > > > I'm currently looking over it, but one thing that stood out was that the > > FLIP proposes to use queryable state > > as a monitoring solution. Given that we have a metric system that > > integrates with plenty of commonly used > > metric backends this doesn't really make sense to me. > > > > Storing them in state still has value in terms of fault-tolerance though, > > since this is something that the metric > > system doesn't provide by itself. > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > >> Are there any more comments on the FLIP? > >> > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and > >> continue with the implementation. > >> > >> Also, is there a committer who'd like to shepherd the FLIP and review > the > >> corresponding PRs? > >> Of course, everybody is welcome to review the code but we need at least > >> one > >> committer who will eventually merge the changes. > >> > >> Best, > >> Fabian > >> > >> [1] > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > >> Improvement+Proposals > >> > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > >> > >> Hi, > >>> > >>> Sorry for the late follow up. > >>> > >>> I think I understand the motivation for choosing ProtoBuf as the > >>> representation and serialization format and this makes sense to me. > >>> > >>> However, it might be a good idea to provide tooling to convert Flink > >>> types > >>> (described as TypeInformation) to ProtoBuf. > >>> Otherwise, users of the model serving library would need to manually > >>> convert their data types (say Scala tuples, case classes, or Avro > Pojos) > >>> to > >>> ProtoBuf messages. > >>> I don't think that this needs to be included in the first version but > it > >>> might be a good extension to make the library easier to use. > >>> > >>> Best, > >>> Fabian > >>> > >>> > >>> > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > >>> [hidden email]> > >>> : > >>> > >>> Thanks Fabian, > >>>> More below > >>>> > >>>> > >>>> > >>>> Boris Lublinsky > >>>> FDP Architect > >>>> [hidden email] > >>>> https://www.lightbend.com/ > >>>> > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> wrote: > >>>> > >>>> Hi Boris and Stavros, > >>>> > >>>> Thanks for the responses. > >>>> > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this part > of > >>>> the proposal. > >>>> I interpreted the argument why to chose ProtoBuf for network encoding > >>>> ("ability > >>>> to represent different data types") such that different a model > pipeline > >>>> should work on different data types. > >>>> I agree that it should be possible to give records of the same type > (but > >>>> with different keys) to different models. The key-based join approach > >>>> looks > >>>> good to me. > >>>> > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize models > >>>> for > >>>> the given reasons. > >>>> However, the choice of ProtoBuf serialization for the records might > make > >>>> the integration with existing libraries and also regular DataStream > >>>> programs more difficult. > >>>> They all use Flink's TypeSerializer system to serialize and > deserialize > >>>> records by default. Hence, we would need to add a conversion step > before > >>>> records can be passed to a model serving operator. > >>>> Are you expecting some common format that all records follow (such as > a > >>>> Row or Vector type) or do you plan to support arbitrary records such > as > >>>> Pojos? > >>>> If you plan for a specific type, you could add a TypeInformation for > >>>> this > >>>> type with a TypeSerializer that is based on ProtoBuf. > >>>> > >>>> The way I look at it is slightly different. The common format for > >>>> records, supported by Flink, is Byte array with a little bit of > header, > >>>> describing data type and is used for routing. The actual unmarshalling > >>>> is > >>>> done by the model implementation itself. This provides the maximum > >>>> flexibility and gives user the freedom to create his own types without > >>>> breaking underlying framework. > >>>> > >>>> Ad 4) @Boris: I made this point not about the serialization format but > >>>> how the library would integrate with Flink's DataStream API. > >>>> I thought I had seen a code snippet that showed a new method on the > >>>> DataStream object but cannot find this anymore. > >>>> So, I just wanted to make the point that we should not change the > >>>> DataStream API (unless it lacks support for some features) and built > the > >>>> model serving library on top of it. > >>>> But I get from Stavros answer that this is your design anyway. > >>>> > >>>> Ad 5) The metrics system is the default way to expose system and job > >>>> metrics in Flink. Due to the pluggable reporter interface and various > >>>> reporters, they can be easily integrated in many production > >>>> environments. > >>>> A solution based on queryable state will always need custom code to > >>>> access the information. Of course this can be an optional feature. > >>>> > >>>> What do others think about this proposal? > >>>> > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but you > are > >>>> the first one outside of it. My book https://www.lightbend.com > >>>> /blog/serving-machine-learning-models-free-oreilly- > ebook-from-lightbend > >>>> has > >>>> > >>>> a reasonably good reviews, so we are hoping this will work > >>>> > >>>> > >>>> Best, Fabian > >>>> > >>>> > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > >>>> [hidden email]> > >>>> : > >>>> > >>>> Hi Fabian thanx! > >>>>> > >>>>> > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to > handle > >>>>>> different input types? > >>>>>> I understand that it makes sense to have different models for > >>>>>> different > >>>>>> instances of the same type, i.e., same data type but different keys. > >>>>>> > >>>>> Hence, > >>>>> > >>>>>> the key-based joins make sense to me. However, couldn't completely > >>>>>> different types be handled by different ML pipelines or would there > be > >>>>>> major drawbacks? > >>>>>> > >>>>> > >>>>> Could you elaborate more on this? Right now we only use keys when we > do > >>>>> the > >>>>> join. A given pipeline can handle only a well defined type (the type > >>>>> can > >>>>> be > >>>>> a simple string with a custom value, no need to be a > >>>>> class type) which serves as a key. > >>>>> > >>>>> 2) > >>>>> > >>>>> I think from an API point of view it would be better to not require > >>>>> > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the model > >>>>>> > >>>>> server > >>>>> > >>>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) > >>>>>> > >>>>> convert > >>>>> > >>>>>> them to ProtoBuf messages internally. In case we need to support > >>>>>> > >>>>> different > >>>>> > >>>>>> types of records (see my first point), we can introduce a Union type > >>>>>> > >>>>> (i.e., > >>>>> > >>>>>> an n-ary Either type). I see that we need some kind of binary > encoding > >>>>>> format for the models but maybe also this can be designed to be > >>>>>> > >>>>> pluggable > >>>>> > >>>>>> such that later other encodings can be added. > >>>>>> > >>>>>> We do uses scala classes (strongly typed classes), protobuf is > only > >>>>> used > >>>>> on the wire. For on the wire encoding we prefer protobufs for size, > >>>>> expressiveness and ability to represent different data types. > >>>>> > >>>>> 3) > >>>>> > >>>>> I think the DataStream Java API should be supported as a first class > >>>>> > >>>>>> citizens for this library. > >>>>>> > >>>>> > >>>>> I agree. It should be either first priority or a next thing to do. > >>>>> > >>>>> > >>>>> 4) > >>>>> > >>>>> For the integration with the DataStream API, we could provide an API > >>>>> that > >>>>> > >>>>>> receives (typed) DataStream objects, internally constructs the > >>>>>> > >>>>> DataStream > >>>>> > >>>>>> operators, and returns one (or more) result DataStreams. The benefit > >>>>>> is > >>>>>> that we don't need to change the DataStream API directly, but put a > >>>>>> > >>>>> library > >>>>> > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > approach. > >>>>>> > >>>>> > >>>>> We will provide a DSL which will do jsut this. But even without the > >>>>> DSL > >>>>> this is what we do with low level joins. > >>>>> > >>>>> > >>>>> 5) > >>>>> > >>>>> I'm skeptical about using queryable state to expose metrics. Did you > >>>>>> consider using Flink's metrics system [1]? It is easily configurable > >>>>>> > >>>>> and we > >>>>> > >>>>>> provided several reporters that export the metrics. > >>>>>> > >>>>>> This of course is an option. The choice of queryable state was > mostly > >>>>> driven by a simplicity of real time integration. Any reason why > >>>>> metrics > >>>>> system is netter? > >>>>> > >>>>> > >>>>> Best, > >>>>> Stavros > >>>>> > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> > >>>>> wrote: > >>>>> > >>>>> Hi Stavros, > >>>>>> > >>>>>> thanks for the detailed FLIP! > >>>>>> Model serving is an important use case and it's great to see efforts > >>>>>> > >>>>> to add > >>>>> > >>>>>> a library for this to Flink! > >>>>>> > >>>>>> I've read the FLIP and would like to ask a few questions and make > some > >>>>>> suggestions. > >>>>>> > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to > >>>>>> handle > >>>>>> different input types? > >>>>>> I understand that it makes sense to have different models for > >>>>>> different > >>>>>> instances of the same type, i.e., same data type but different keys. > >>>>>> > >>>>> Hence, > >>>>> > >>>>>> the key-based joins make sense to me. However, couldn't completely > >>>>>> different types be handled by different ML pipelines or would there > be > >>>>>> major drawbacks? > >>>>>> > >>>>>> 2) I think from an API point of view it would be better to not > require > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the model > >>>>>> > >>>>> server > >>>>> > >>>>>> could accept strongly-typed objects (Java/Scala) and (if necessary) > >>>>>> > >>>>> convert > >>>>> > >>>>>> them to ProtoBuf messages internally. In case we need to support > >>>>>> > >>>>> different > >>>>> > >>>>>> types of records (see my first point), we can introduce a Union type > >>>>>> > >>>>> (i.e., > >>>>> > >>>>>> an n-ary Either type). I see that we need some kind of binary > encoding > >>>>>> format for the models but maybe also this can be designed to be > >>>>>> > >>>>> pluggable > >>>>> > >>>>>> such that later other encodings can be added. > >>>>>> > >>>>>> 3) I think the DataStream Java API should be supported as a first > >>>>>> class > >>>>>> citizens for this library. > >>>>>> > >>>>>> 4) For the integration with the DataStream API, we could provide an > >>>>>> API > >>>>>> that receives (typed) DataStream objects, internally constructs the > >>>>>> DataStream operators, and returns one (or more) result DataStreams. > >>>>>> The > >>>>>> benefit is that we don't need to change the DataStream API directly, > >>>>>> > >>>>> but > >>>>> > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) follow > >>>>>> > >>>>> this > >>>>> > >>>>>> approach. > >>>>>> > >>>>>> 5) I'm skeptical about using queryable state to expose metrics. Did > >>>>>> you > >>>>>> consider using Flink's metrics system [1]? It is easily configurable > >>>>>> > >>>>> and we > >>>>> > >>>>>> provided several reporters that export the metrics. > >>>>>> > >>>>>> What do you think? > >>>>>> Best, Fabian > >>>>>> > >>>>>> [1] > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > >>>>>> > >>>>> monitoring/ > >>>>> > >>>>>> metrics.html > >>>>>> > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > >>>>>> > >>>>> [hidden email]>: > >>>>> > >>>>>> Hi guys, > >>>>>>> > >>>>>>> Let's discuss the new FLIP proposal for model serving over Flink. > The > >>>>>>> > >>>>>> idea > >>>>>> > >>>>>>> is to combine previous efforts there and provide a library on top > of > >>>>>>> > >>>>>> Flink > >>>>>> > >>>>>>> for serving models. > >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > >>>>>>> > >>>>>> 23+-+Model+Serving > >>>>>> > >>>>>>> Code from previous efforts can be found here: > >>>>>>> > >>>>>> https://github.com/FlinkML > >>>>> > >>>>>> Best, > >>>>>>> Stavros > >>>>>>> > >>>>>>> > >>>> > >>>> > > > |
Hi everybody,
The question of how to serve ML models in Flink applications came up in several conversations I had with Flink users in the last months. Recently, Boris approached me and he told me that he'd like to revive the efforts around FLIP-23 [1]. In the last days, Boris extended the proposal by a speculative model evaluation which allows for evaluating multiple modes of varying complexity to ensure certain SLAs. The code does already exist in a Github repository [2]. Due to the frequent user requests and the fact that the code is already present, I think would be a great feature for Flink to have. Since this is a library on top of Flink's existing APIs this should not be too hard to review. What do others think? Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving [2] https://github.com/FlinkML/flink-speculative-modelServer Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < [hidden email]>: > Thanx @Fabian. I will update the document accordingly wrt metrics. > I agree there are pros and cons. > > Best, > Stavros > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email]> wrote: > > > OK, I think there was plenty of time to comment on this FLIP. > > I'll move it to the ACCEPTED status. > > > > @Stavros, please consider the feedback regarding the metrics. > > I agree with Chesnay that metrics should be primarily exposed via the > > metrics system. > > Storing them in state makes them fault-tolerant and queryable if the > state > > is properly configured. > > > > Thanks, > > Fabian > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > > > > > I'm currently looking over it, but one thing that stood out was that > the > > > FLIP proposes to use queryable state > > > as a monitoring solution. Given that we have a metric system that > > > integrates with plenty of commonly used > > > metric backends this doesn't really make sense to me. > > > > > > Storing them in state still has value in terms of fault-tolerance > though, > > > since this is something that the metric > > > system doesn't provide by itself. > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > >> Are there any more comments on the FLIP? > > >> > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] and > > >> continue with the implementation. > > >> > > >> Also, is there a committer who'd like to shepherd the FLIP and review > > the > > >> corresponding PRs? > > >> Of course, everybody is welcome to review the code but we need at > least > > >> one > > >> committer who will eventually merge the changes. > > >> > > >> Best, > > >> Fabian > > >> > > >> [1] > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > >> Improvement+Proposals > > >> > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > > >> > > >> Hi, > > >>> > > >>> Sorry for the late follow up. > > >>> > > >>> I think I understand the motivation for choosing ProtoBuf as the > > >>> representation and serialization format and this makes sense to me. > > >>> > > >>> However, it might be a good idea to provide tooling to convert Flink > > >>> types > > >>> (described as TypeInformation) to ProtoBuf. > > >>> Otherwise, users of the model serving library would need to manually > > >>> convert their data types (say Scala tuples, case classes, or Avro > > Pojos) > > >>> to > > >>> ProtoBuf messages. > > >>> I don't think that this needs to be included in the first version but > > it > > >>> might be a good extension to make the library easier to use. > > >>> > > >>> Best, > > >>> Fabian > > >>> > > >>> > > >>> > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > >>> [hidden email]> > > >>> : > > >>> > > >>> Thanks Fabian, > > >>>> More below > > >>>> > > >>>> > > >>>> > > >>>> Boris Lublinsky > > >>>> FDP Architect > > >>>> [hidden email] > > >>>> https://www.lightbend.com/ > > >>>> > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> > wrote: > > >>>> > > >>>> Hi Boris and Stavros, > > >>>> > > >>>> Thanks for the responses. > > >>>> > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > part > > of > > >>>> the proposal. > > >>>> I interpreted the argument why to chose ProtoBuf for network > encoding > > >>>> ("ability > > >>>> to represent different data types") such that different a model > > pipeline > > >>>> should work on different data types. > > >>>> I agree that it should be possible to give records of the same type > > (but > > >>>> with different keys) to different models. The key-based join > approach > > >>>> looks > > >>>> good to me. > > >>>> > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > models > > >>>> for > > >>>> the given reasons. > > >>>> However, the choice of ProtoBuf serialization for the records might > > make > > >>>> the integration with existing libraries and also regular DataStream > > >>>> programs more difficult. > > >>>> They all use Flink's TypeSerializer system to serialize and > > deserialize > > >>>> records by default. Hence, we would need to add a conversion step > > before > > >>>> records can be passed to a model serving operator. > > >>>> Are you expecting some common format that all records follow (such > as > > a > > >>>> Row or Vector type) or do you plan to support arbitrary records such > > as > > >>>> Pojos? > > >>>> If you plan for a specific type, you could add a TypeInformation for > > >>>> this > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > >>>> > > >>>> The way I look at it is slightly different. The common format for > > >>>> records, supported by Flink, is Byte array with a little bit of > > header, > > >>>> describing data type and is used for routing. The actual > unmarshalling > > >>>> is > > >>>> done by the model implementation itself. This provides the maximum > > >>>> flexibility and gives user the freedom to create his own types > without > > >>>> breaking underlying framework. > > >>>> > > >>>> Ad 4) @Boris: I made this point not about the serialization format > but > > >>>> how the library would integrate with Flink's DataStream API. > > >>>> I thought I had seen a code snippet that showed a new method on the > > >>>> DataStream object but cannot find this anymore. > > >>>> So, I just wanted to make the point that we should not change the > > >>>> DataStream API (unless it lacks support for some features) and built > > the > > >>>> model serving library on top of it. > > >>>> But I get from Stavros answer that this is your design anyway. > > >>>> > > >>>> Ad 5) The metrics system is the default way to expose system and job > > >>>> metrics in Flink. Due to the pluggable reporter interface and > various > > >>>> reporters, they can be easily integrated in many production > > >>>> environments. > > >>>> A solution based on queryable state will always need custom code to > > >>>> access the information. Of course this can be an optional feature. > > >>>> > > >>>> What do others think about this proposal? > > >>>> > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but you > > are > > >>>> the first one outside of it. My book https://www.lightbend.com > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > ebook-from-lightbend > > >>>> has > > >>>> > > >>>> a reasonably good reviews, so we are hoping this will work > > >>>> > > >>>> > > >>>> Best, Fabian > > >>>> > > >>>> > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > >>>> [hidden email]> > > >>>> : > > >>>> > > >>>> Hi Fabian thanx! > > >>>>> > > >>>>> > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > handle > > >>>>>> different input types? > > >>>>>> I understand that it makes sense to have different models for > > >>>>>> different > > >>>>>> instances of the same type, i.e., same data type but different > keys. > > >>>>>> > > >>>>> Hence, > > >>>>> > > >>>>>> the key-based joins make sense to me. However, couldn't completely > > >>>>>> different types be handled by different ML pipelines or would > there > > be > > >>>>>> major drawbacks? > > >>>>>> > > >>>>> > > >>>>> Could you elaborate more on this? Right now we only use keys when > we > > do > > >>>>> the > > >>>>> join. A given pipeline can handle only a well defined type (the > type > > >>>>> can > > >>>>> be > > >>>>> a simple string with a custom value, no need to be a > > >>>>> class type) which serves as a key. > > >>>>> > > >>>>> 2) > > >>>>> > > >>>>> I think from an API point of view it would be better to not require > > >>>>> > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > model > > >>>>>> > > >>>>> server > > >>>>> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > necessary) > > >>>>>> > > >>>>> convert > > >>>>> > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > >>>>>> > > >>>>> different > > >>>>> > > >>>>>> types of records (see my first point), we can introduce a Union > type > > >>>>>> > > >>>>> (i.e., > > >>>>> > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > encoding > > >>>>>> format for the models but maybe also this can be designed to be > > >>>>>> > > >>>>> pluggable > > >>>>> > > >>>>>> such that later other encodings can be added. > > >>>>>> > > >>>>>> We do uses scala classes (strongly typed classes), protobuf is > > only > > >>>>> used > > >>>>> on the wire. For on the wire encoding we prefer protobufs for size, > > >>>>> expressiveness and ability to represent different data types. > > >>>>> > > >>>>> 3) > > >>>>> > > >>>>> I think the DataStream Java API should be supported as a first > class > > >>>>> > > >>>>>> citizens for this library. > > >>>>>> > > >>>>> > > >>>>> I agree. It should be either first priority or a next thing to do. > > >>>>> > > >>>>> > > >>>>> 4) > > >>>>> > > >>>>> For the integration with the DataStream API, we could provide an > API > > >>>>> that > > >>>>> > > >>>>>> receives (typed) DataStream objects, internally constructs the > > >>>>>> > > >>>>> DataStream > > >>>>> > > >>>>>> operators, and returns one (or more) result DataStreams. The > benefit > > >>>>>> is > > >>>>>> that we don't need to change the DataStream API directly, but put > a > > >>>>>> > > >>>>> library > > >>>>> > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > approach. > > >>>>>> > > >>>>> > > >>>>> We will provide a DSL which will do jsut this. But even without > the > > >>>>> DSL > > >>>>> this is what we do with low level joins. > > >>>>> > > >>>>> > > >>>>> 5) > > >>>>> > > >>>>> I'm skeptical about using queryable state to expose metrics. Did > you > > >>>>>> consider using Flink's metrics system [1]? It is easily > configurable > > >>>>>> > > >>>>> and we > > >>>>> > > >>>>>> provided several reporters that export the metrics. > > >>>>>> > > >>>>>> This of course is an option. The choice of queryable state was > > mostly > > >>>>> driven by a simplicity of real time integration. Any reason why > > >>>>> metrics > > >>>>> system is netter? > > >>>>> > > >>>>> > > >>>>> Best, > > >>>>> Stavros > > >>>>> > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske <[hidden email]> > > >>>>> wrote: > > >>>>> > > >>>>> Hi Stavros, > > >>>>>> > > >>>>>> thanks for the detailed FLIP! > > >>>>>> Model serving is an important use case and it's great to see > efforts > > >>>>>> > > >>>>> to add > > >>>>> > > >>>>>> a library for this to Flink! > > >>>>>> > > >>>>>> I've read the FLIP and would like to ask a few questions and make > > some > > >>>>>> suggestions. > > >>>>>> > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > >>>>>> handle > > >>>>>> different input types? > > >>>>>> I understand that it makes sense to have different models for > > >>>>>> different > > >>>>>> instances of the same type, i.e., same data type but different > keys. > > >>>>>> > > >>>>> Hence, > > >>>>> > > >>>>>> the key-based joins make sense to me. However, couldn't completely > > >>>>>> different types be handled by different ML pipelines or would > there > > be > > >>>>>> major drawbacks? > > >>>>>> > > >>>>>> 2) I think from an API point of view it would be better to not > > require > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > model > > >>>>>> > > >>>>> server > > >>>>> > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > necessary) > > >>>>>> > > >>>>> convert > > >>>>> > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > >>>>>> > > >>>>> different > > >>>>> > > >>>>>> types of records (see my first point), we can introduce a Union > type > > >>>>>> > > >>>>> (i.e., > > >>>>> > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > encoding > > >>>>>> format for the models but maybe also this can be designed to be > > >>>>>> > > >>>>> pluggable > > >>>>> > > >>>>>> such that later other encodings can be added. > > >>>>>> > > >>>>>> 3) I think the DataStream Java API should be supported as a first > > >>>>>> class > > >>>>>> citizens for this library. > > >>>>>> > > >>>>>> 4) For the integration with the DataStream API, we could provide > an > > >>>>>> API > > >>>>>> that receives (typed) DataStream objects, internally constructs > the > > >>>>>> DataStream operators, and returns one (or more) result > DataStreams. > > >>>>>> The > > >>>>>> benefit is that we don't need to change the DataStream API > directly, > > >>>>>> > > >>>>> but > > >>>>> > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > follow > > >>>>>> > > >>>>> this > > >>>>> > > >>>>>> approach. > > >>>>>> > > >>>>>> 5) I'm skeptical about using queryable state to expose metrics. > Did > > >>>>>> you > > >>>>>> consider using Flink's metrics system [1]? It is easily > configurable > > >>>>>> > > >>>>> and we > > >>>>> > > >>>>>> provided several reporters that export the metrics. > > >>>>>> > > >>>>>> What do you think? > > >>>>>> Best, Fabian > > >>>>>> > > >>>>>> [1] > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > >>>>>> > > >>>>> monitoring/ > > >>>>> > > >>>>>> metrics.html > > >>>>>> > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > >>>>>> > > >>>>> [hidden email]>: > > >>>>> > > >>>>>> Hi guys, > > >>>>>>> > > >>>>>>> Let's discuss the new FLIP proposal for model serving over Flink. > > The > > >>>>>>> > > >>>>>> idea > > >>>>>> > > >>>>>>> is to combine previous efforts there and provide a library on top > > of > > >>>>>>> > > >>>>>> Flink > > >>>>>> > > >>>>>>> for serving models. > > >>>>>>> > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > >>>>>>> > > >>>>>> 23+-+Model+Serving > > >>>>>> > > >>>>>>> Code from previous efforts can be found here: > > >>>>>>> > > >>>>>> https://github.com/FlinkML > > >>>>> > > >>>>>> Best, > > >>>>>>> Stavros > > >>>>>>> > > >>>>>>> > > >>>> > > >>>> > > > > > > |
Thanks for the contribution Boris!! I've been playing around with the basic
model for a while back and loved it. +1 and really looking forward to having the feature merging back to Flink ML. -- Rong On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <[hidden email]> wrote: > Hi everybody, > > The question of how to serve ML models in Flink applications came up in > several conversations I had with Flink users in the last months. > Recently, Boris approached me and he told me that he'd like to revive the > efforts around FLIP-23 [1]. > > In the last days, Boris extended the proposal by a speculative model > evaluation which allows for evaluating multiple modes of varying complexity > to ensure certain SLAs. > The code does already exist in a Github repository [2]. > > Due to the frequent user requests and the fact that the code is already > present, I think would be a great feature for Flink to have. > Since this is a library on top of Flink's existing APIs this should not be > too hard to review. > > What do others think? > > Best, Fabian > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > [2] https://github.com/FlinkML/flink-speculative-modelServer > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > [hidden email]>: > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > I agree there are pros and cons. > > > > Best, > > Stavros > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email]> > wrote: > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > I'll move it to the ACCEPTED status. > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > I agree with Chesnay that metrics should be primarily exposed via the > > > metrics system. > > > Storing them in state makes them fault-tolerant and queryable if the > > state > > > is properly configured. > > > > > > Thanks, > > > Fabian > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > > > > > > > I'm currently looking over it, but one thing that stood out was that > > the > > > > FLIP proposes to use queryable state > > > > as a monitoring solution. Given that we have a metric system that > > > > integrates with plenty of commonly used > > > > metric backends this doesn't really make sense to me. > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > though, > > > > since this is something that the metric > > > > system doesn't provide by itself. > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > >> Are there any more comments on the FLIP? > > > >> > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] > and > > > >> continue with the implementation. > > > >> > > > >> Also, is there a committer who'd like to shepherd the FLIP and > review > > > the > > > >> corresponding PRs? > > > >> Of course, everybody is welcome to review the code but we need at > > least > > > >> one > > > >> committer who will eventually merge the changes. > > > >> > > > >> Best, > > > >> Fabian > > > >> > > > >> [1] > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > > >> Improvement+Proposals > > > >> > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > > > >> > > > >> Hi, > > > >>> > > > >>> Sorry for the late follow up. > > > >>> > > > >>> I think I understand the motivation for choosing ProtoBuf as the > > > >>> representation and serialization format and this makes sense to me. > > > >>> > > > >>> However, it might be a good idea to provide tooling to convert > Flink > > > >>> types > > > >>> (described as TypeInformation) to ProtoBuf. > > > >>> Otherwise, users of the model serving library would need to > manually > > > >>> convert their data types (say Scala tuples, case classes, or Avro > > > Pojos) > > > >>> to > > > >>> ProtoBuf messages. > > > >>> I don't think that this needs to be included in the first version > but > > > it > > > >>> might be a good extension to make the library easier to use. > > > >>> > > > >>> Best, > > > >>> Fabian > > > >>> > > > >>> > > > >>> > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > >>> [hidden email]> > > > >>> : > > > >>> > > > >>> Thanks Fabian, > > > >>>> More below > > > >>>> > > > >>>> > > > >>>> > > > >>>> Boris Lublinsky > > > >>>> FDP Architect > > > >>>> [hidden email] > > > >>>> https://www.lightbend.com/ > > > >>>> > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> > > wrote: > > > >>>> > > > >>>> Hi Boris and Stavros, > > > >>>> > > > >>>> Thanks for the responses. > > > >>>> > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > > part > > > of > > > >>>> the proposal. > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > encoding > > > >>>> ("ability > > > >>>> to represent different data types") such that different a model > > > pipeline > > > >>>> should work on different data types. > > > >>>> I agree that it should be possible to give records of the same > type > > > (but > > > >>>> with different keys) to different models. The key-based join > > approach > > > >>>> looks > > > >>>> good to me. > > > >>>> > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > models > > > >>>> for > > > >>>> the given reasons. > > > >>>> However, the choice of ProtoBuf serialization for the records > might > > > make > > > >>>> the integration with existing libraries and also regular > DataStream > > > >>>> programs more difficult. > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > deserialize > > > >>>> records by default. Hence, we would need to add a conversion step > > > before > > > >>>> records can be passed to a model serving operator. > > > >>>> Are you expecting some common format that all records follow (such > > as > > > a > > > >>>> Row or Vector type) or do you plan to support arbitrary records > such > > > as > > > >>>> Pojos? > > > >>>> If you plan for a specific type, you could add a TypeInformation > for > > > >>>> this > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > >>>> > > > >>>> The way I look at it is slightly different. The common format for > > > >>>> records, supported by Flink, is Byte array with a little bit of > > > header, > > > >>>> describing data type and is used for routing. The actual > > unmarshalling > > > >>>> is > > > >>>> done by the model implementation itself. This provides the maximum > > > >>>> flexibility and gives user the freedom to create his own types > > without > > > >>>> breaking underlying framework. > > > >>>> > > > >>>> Ad 4) @Boris: I made this point not about the serialization format > > but > > > >>>> how the library would integrate with Flink's DataStream API. > > > >>>> I thought I had seen a code snippet that showed a new method on > the > > > >>>> DataStream object but cannot find this anymore. > > > >>>> So, I just wanted to make the point that we should not change the > > > >>>> DataStream API (unless it lacks support for some features) and > built > > > the > > > >>>> model serving library on top of it. > > > >>>> But I get from Stavros answer that this is your design anyway. > > > >>>> > > > >>>> Ad 5) The metrics system is the default way to expose system and > job > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > various > > > >>>> reporters, they can be easily integrated in many production > > > >>>> environments. > > > >>>> A solution based on queryable state will always need custom code > to > > > >>>> access the information. Of course this can be an optional feature. > > > >>>> > > > >>>> What do others think about this proposal? > > > >>>> > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but > you > > > are > > > >>>> the first one outside of it. My book https://www.lightbend.com > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > ebook-from-lightbend > > > >>>> has > > > >>>> > > > >>>> a reasonably good reviews, so we are hoping this will work > > > >>>> > > > >>>> > > > >>>> Best, Fabian > > > >>>> > > > >>>> > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > >>>> [hidden email]> > > > >>>> : > > > >>>> > > > >>>> Hi Fabian thanx! > > > >>>>> > > > >>>>> > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > > handle > > > >>>>>> different input types? > > > >>>>>> I understand that it makes sense to have different models for > > > >>>>>> different > > > >>>>>> instances of the same type, i.e., same data type but different > > keys. > > > >>>>>> > > > >>>>> Hence, > > > >>>>> > > > >>>>>> the key-based joins make sense to me. However, couldn't > completely > > > >>>>>> different types be handled by different ML pipelines or would > > there > > > be > > > >>>>>> major drawbacks? > > > >>>>>> > > > >>>>> > > > >>>>> Could you elaborate more on this? Right now we only use keys when > > we > > > do > > > >>>>> the > > > >>>>> join. A given pipeline can handle only a well defined type (the > > type > > > >>>>> can > > > >>>>> be > > > >>>>> a simple string with a custom value, no need to be a > > > >>>>> class type) which serves as a key. > > > >>>>> > > > >>>>> 2) > > > >>>>> > > > >>>>> I think from an API point of view it would be better to not > require > > > >>>>> > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > model > > > >>>>>> > > > >>>>> server > > > >>>>> > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > necessary) > > > >>>>>> > > > >>>>> convert > > > >>>>> > > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > > >>>>>> > > > >>>>> different > > > >>>>> > > > >>>>>> types of records (see my first point), we can introduce a Union > > type > > > >>>>>> > > > >>>>> (i.e., > > > >>>>> > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > encoding > > > >>>>>> format for the models but maybe also this can be designed to be > > > >>>>>> > > > >>>>> pluggable > > > >>>>> > > > >>>>>> such that later other encodings can be added. > > > >>>>>> > > > >>>>>> We do uses scala classes (strongly typed classes), protobuf is > > > only > > > >>>>> used > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > size, > > > >>>>> expressiveness and ability to represent different data types. > > > >>>>> > > > >>>>> 3) > > > >>>>> > > > >>>>> I think the DataStream Java API should be supported as a first > > class > > > >>>>> > > > >>>>>> citizens for this library. > > > >>>>>> > > > >>>>> > > > >>>>> I agree. It should be either first priority or a next thing to > do. > > > >>>>> > > > >>>>> > > > >>>>> 4) > > > >>>>> > > > >>>>> For the integration with the DataStream API, we could provide an > > API > > > >>>>> that > > > >>>>> > > > >>>>>> receives (typed) DataStream objects, internally constructs the > > > >>>>>> > > > >>>>> DataStream > > > >>>>> > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > benefit > > > >>>>>> is > > > >>>>>> that we don't need to change the DataStream API directly, but > put > > a > > > >>>>>> > > > >>>>> library > > > >>>>> > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > approach. > > > >>>>>> > > > >>>>> > > > >>>>> We will provide a DSL which will do jsut this. But even without > > the > > > >>>>> DSL > > > >>>>> this is what we do with low level joins. > > > >>>>> > > > >>>>> > > > >>>>> 5) > > > >>>>> > > > >>>>> I'm skeptical about using queryable state to expose metrics. Did > > you > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > configurable > > > >>>>>> > > > >>>>> and we > > > >>>>> > > > >>>>>> provided several reporters that export the metrics. > > > >>>>>> > > > >>>>>> This of course is an option. The choice of queryable state was > > > mostly > > > >>>>> driven by a simplicity of real time integration. Any reason why > > > >>>>> metrics > > > >>>>> system is netter? > > > >>>>> > > > >>>>> > > > >>>>> Best, > > > >>>>> Stavros > > > >>>>> > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > [hidden email]> > > > >>>>> wrote: > > > >>>>> > > > >>>>> Hi Stavros, > > > >>>>>> > > > >>>>>> thanks for the detailed FLIP! > > > >>>>>> Model serving is an important use case and it's great to see > > efforts > > > >>>>>> > > > >>>>> to add > > > >>>>> > > > >>>>>> a library for this to Flink! > > > >>>>>> > > > >>>>>> I've read the FLIP and would like to ask a few questions and > make > > > some > > > >>>>>> suggestions. > > > >>>>>> > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able to > > > >>>>>> handle > > > >>>>>> different input types? > > > >>>>>> I understand that it makes sense to have different models for > > > >>>>>> different > > > >>>>>> instances of the same type, i.e., same data type but different > > keys. > > > >>>>>> > > > >>>>> Hence, > > > >>>>> > > > >>>>>> the key-based joins make sense to me. However, couldn't > completely > > > >>>>>> different types be handled by different ML pipelines or would > > there > > > be > > > >>>>>> major drawbacks? > > > >>>>>> > > > >>>>>> 2) I think from an API point of view it would be better to not > > > require > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > model > > > >>>>>> > > > >>>>> server > > > >>>>> > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > necessary) > > > >>>>>> > > > >>>>> convert > > > >>>>> > > > >>>>>> them to ProtoBuf messages internally. In case we need to support > > > >>>>>> > > > >>>>> different > > > >>>>> > > > >>>>>> types of records (see my first point), we can introduce a Union > > type > > > >>>>>> > > > >>>>> (i.e., > > > >>>>> > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > encoding > > > >>>>>> format for the models but maybe also this can be designed to be > > > >>>>>> > > > >>>>> pluggable > > > >>>>> > > > >>>>>> such that later other encodings can be added. > > > >>>>>> > > > >>>>>> 3) I think the DataStream Java API should be supported as a > first > > > >>>>>> class > > > >>>>>> citizens for this library. > > > >>>>>> > > > >>>>>> 4) For the integration with the DataStream API, we could provide > > an > > > >>>>>> API > > > >>>>>> that receives (typed) DataStream objects, internally constructs > > the > > > >>>>>> DataStream operators, and returns one (or more) result > > DataStreams. > > > >>>>>> The > > > >>>>>> benefit is that we don't need to change the DataStream API > > directly, > > > >>>>>> > > > >>>>> but > > > >>>>> > > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > > follow > > > >>>>>> > > > >>>>> this > > > >>>>> > > > >>>>>> approach. > > > >>>>>> > > > >>>>>> 5) I'm skeptical about using queryable state to expose metrics. > > Did > > > >>>>>> you > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > configurable > > > >>>>>> > > > >>>>> and we > > > >>>>> > > > >>>>>> provided several reporters that export the metrics. > > > >>>>>> > > > >>>>>> What do you think? > > > >>>>>> Best, Fabian > > > >>>>>> > > > >>>>>> [1] > > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > >>>>>> > > > >>>>> monitoring/ > > > >>>>> > > > >>>>>> metrics.html > > > >>>>>> > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > >>>>>> > > > >>>>> [hidden email]>: > > > >>>>> > > > >>>>>> Hi guys, > > > >>>>>>> > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > Flink. > > > The > > > >>>>>>> > > > >>>>>> idea > > > >>>>>> > > > >>>>>>> is to combine previous efforts there and provide a library on > top > > > of > > > >>>>>>> > > > >>>>>> Flink > > > >>>>>> > > > >>>>>>> for serving models. > > > >>>>>>> > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > >>>>>>> > > > >>>>>> 23+-+Model+Serving > > > >>>>>> > > > >>>>>>> Code from previous efforts can be found here: > > > >>>>>>> > > > >>>>>> https://github.com/FlinkML > > > >>>>> > > > >>>>>> Best, > > > >>>>>>> Stavros > > > >>>>>>> > > > >>>>>>> > > > >>>> > > > >>>> > > > > > > > > > > |
Hey all,
I'm wondering if somebody on the list can take a look at the PR from FLIP-23: https://github.com/apache/flink/pull/7446 On Mon, Oct 1, 2018 at 6:13 PM Rong Rong <[hidden email]> wrote: > Thanks for the contribution Boris!! I've been playing around with the basic > model for a while back and loved it. > +1 and really looking forward to having the feature merging back to Flink > ML. > > -- > Rong > > On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <[hidden email]> wrote: > > > Hi everybody, > > > > The question of how to serve ML models in Flink applications came up in > > several conversations I had with Flink users in the last months. > > Recently, Boris approached me and he told me that he'd like to revive the > > efforts around FLIP-23 [1]. > > > > In the last days, Boris extended the proposal by a speculative model > > evaluation which allows for evaluating multiple modes of varying > complexity > > to ensure certain SLAs. > > The code does already exist in a Github repository [2]. > > > > Due to the frequent user requests and the fact that the code is already > > present, I think would be a great feature for Flink to have. > > Since this is a library on top of Flink's existing APIs this should not > be > > too hard to review. > > > > What do others think? > > > > Best, Fabian > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > > [2] https://github.com/FlinkML/flink-speculative-modelServer > > > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > > [hidden email]>: > > > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > > I agree there are pros and cons. > > > > > > Best, > > > Stavros > > > > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > > I'll move it to the ACCEPTED status. > > > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > > I agree with Chesnay that metrics should be primarily exposed via the > > > > metrics system. > > > > Storing them in state makes them fault-tolerant and queryable if the > > > state > > > > is properly configured. > > > > > > > > Thanks, > > > > Fabian > > > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > > > > > > > > > I'm currently looking over it, but one thing that stood out was > that > > > the > > > > > FLIP proposes to use queryable state > > > > > as a monitoring solution. Given that we have a metric system that > > > > > integrates with plenty of commonly used > > > > > metric backends this doesn't really make sense to me. > > > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > > though, > > > > > since this is something that the metric > > > > > system doesn't provide by itself. > > > > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > > > >> Are there any more comments on the FLIP? > > > > >> > > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] > > and > > > > >> continue with the implementation. > > > > >> > > > > >> Also, is there a committer who'd like to shepherd the FLIP and > > review > > > > the > > > > >> corresponding PRs? > > > > >> Of course, everybody is welcome to review the code but we need at > > > least > > > > >> one > > > > >> committer who will eventually merge the changes. > > > > >> > > > > >> Best, > > > > >> Fabian > > > > >> > > > > >> [1] > > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > > > >> Improvement+Proposals > > > > >> > > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > > > > >> > > > > >> Hi, > > > > >>> > > > > >>> Sorry for the late follow up. > > > > >>> > > > > >>> I think I understand the motivation for choosing ProtoBuf as the > > > > >>> representation and serialization format and this makes sense to > me. > > > > >>> > > > > >>> However, it might be a good idea to provide tooling to convert > > Flink > > > > >>> types > > > > >>> (described as TypeInformation) to ProtoBuf. > > > > >>> Otherwise, users of the model serving library would need to > > manually > > > > >>> convert their data types (say Scala tuples, case classes, or Avro > > > > Pojos) > > > > >>> to > > > > >>> ProtoBuf messages. > > > > >>> I don't think that this needs to be included in the first version > > but > > > > it > > > > >>> might be a good extension to make the library easier to use. > > > > >>> > > > > >>> Best, > > > > >>> Fabian > > > > >>> > > > > >>> > > > > >>> > > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > > >>> [hidden email]> > > > > >>> : > > > > >>> > > > > >>> Thanks Fabian, > > > > >>>> More below > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Boris Lublinsky > > > > >>>> FDP Architect > > > > >>>> [hidden email] > > > > >>>> https://www.lightbend.com/ > > > > >>>> > > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email]> > > > wrote: > > > > >>>> > > > > >>>> Hi Boris and Stavros, > > > > >>>> > > > > >>>> Thanks for the responses. > > > > >>>> > > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > > > part > > > > of > > > > >>>> the proposal. > > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > > encoding > > > > >>>> ("ability > > > > >>>> to represent different data types") such that different a model > > > > pipeline > > > > >>>> should work on different data types. > > > > >>>> I agree that it should be possible to give records of the same > > type > > > > (but > > > > >>>> with different keys) to different models. The key-based join > > > approach > > > > >>>> looks > > > > >>>> good to me. > > > > >>>> > > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > > models > > > > >>>> for > > > > >>>> the given reasons. > > > > >>>> However, the choice of ProtoBuf serialization for the records > > might > > > > make > > > > >>>> the integration with existing libraries and also regular > > DataStream > > > > >>>> programs more difficult. > > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > > deserialize > > > > >>>> records by default. Hence, we would need to add a conversion > step > > > > before > > > > >>>> records can be passed to a model serving operator. > > > > >>>> Are you expecting some common format that all records follow > (such > > > as > > > > a > > > > >>>> Row or Vector type) or do you plan to support arbitrary records > > such > > > > as > > > > >>>> Pojos? > > > > >>>> If you plan for a specific type, you could add a TypeInformation > > for > > > > >>>> this > > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > > >>>> > > > > >>>> The way I look at it is slightly different. The common format > for > > > > >>>> records, supported by Flink, is Byte array with a little bit of > > > > header, > > > > >>>> describing data type and is used for routing. The actual > > > unmarshalling > > > > >>>> is > > > > >>>> done by the model implementation itself. This provides the > maximum > > > > >>>> flexibility and gives user the freedom to create his own types > > > without > > > > >>>> breaking underlying framework. > > > > >>>> > > > > >>>> Ad 4) @Boris: I made this point not about the serialization > format > > > but > > > > >>>> how the library would integrate with Flink's DataStream API. > > > > >>>> I thought I had seen a code snippet that showed a new method on > > the > > > > >>>> DataStream object but cannot find this anymore. > > > > >>>> So, I just wanted to make the point that we should not change > the > > > > >>>> DataStream API (unless it lacks support for some features) and > > built > > > > the > > > > >>>> model serving library on top of it. > > > > >>>> But I get from Stavros answer that this is your design anyway. > > > > >>>> > > > > >>>> Ad 5) The metrics system is the default way to expose system and > > job > > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > > various > > > > >>>> reporters, they can be easily integrated in many production > > > > >>>> environments. > > > > >>>> A solution based on queryable state will always need custom code > > to > > > > >>>> access the information. Of course this can be an optional > feature. > > > > >>>> > > > > >>>> What do others think about this proposal? > > > > >>>> > > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but > > you > > > > are > > > > >>>> the first one outside of it. My book https://www.lightbend.com > > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > > ebook-from-lightbend > > > > >>>> has > > > > >>>> > > > > >>>> a reasonably good reviews, so we are hoping this will work > > > > >>>> > > > > >>>> > > > > >>>> Best, Fabian > > > > >>>> > > > > >>>> > > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > > >>>> [hidden email]> > > > > >>>> : > > > > >>>> > > > > >>>> Hi Fabian thanx! > > > > >>>>> > > > > >>>>> > > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able > to > > > > handle > > > > >>>>>> different input types? > > > > >>>>>> I understand that it makes sense to have different models for > > > > >>>>>> different > > > > >>>>>> instances of the same type, i.e., same data type but different > > > keys. > > > > >>>>>> > > > > >>>>> Hence, > > > > >>>>> > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > completely > > > > >>>>>> different types be handled by different ML pipelines or would > > > there > > > > be > > > > >>>>>> major drawbacks? > > > > >>>>>> > > > > >>>>> > > > > >>>>> Could you elaborate more on this? Right now we only use keys > when > > > we > > > > do > > > > >>>>> the > > > > >>>>> join. A given pipeline can handle only a well defined type (the > > > type > > > > >>>>> can > > > > >>>>> be > > > > >>>>> a simple string with a custom value, no need to be a > > > > >>>>> class type) which serves as a key. > > > > >>>>> > > > > >>>>> 2) > > > > >>>>> > > > > >>>>> I think from an API point of view it would be better to not > > require > > > > >>>>> > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > > model > > > > >>>>>> > > > > >>>>> server > > > > >>>>> > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > necessary) > > > > >>>>>> > > > > >>>>> convert > > > > >>>>> > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > support > > > > >>>>>> > > > > >>>>> different > > > > >>>>> > > > > >>>>>> types of records (see my first point), we can introduce a > Union > > > type > > > > >>>>>> > > > > >>>>> (i.e., > > > > >>>>> > > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > > encoding > > > > >>>>>> format for the models but maybe also this can be designed to > be > > > > >>>>>> > > > > >>>>> pluggable > > > > >>>>> > > > > >>>>>> such that later other encodings can be added. > > > > >>>>>> > > > > >>>>>> We do uses scala classes (strongly typed classes), protobuf > is > > > > only > > > > >>>>> used > > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > > size, > > > > >>>>> expressiveness and ability to represent different data types. > > > > >>>>> > > > > >>>>> 3) > > > > >>>>> > > > > >>>>> I think the DataStream Java API should be supported as a first > > > class > > > > >>>>> > > > > >>>>>> citizens for this library. > > > > >>>>>> > > > > >>>>> > > > > >>>>> I agree. It should be either first priority or a next thing to > > do. > > > > >>>>> > > > > >>>>> > > > > >>>>> 4) > > > > >>>>> > > > > >>>>> For the integration with the DataStream API, we could provide > an > > > API > > > > >>>>> that > > > > >>>>> > > > > >>>>>> receives (typed) DataStream objects, internally constructs the > > > > >>>>>> > > > > >>>>> DataStream > > > > >>>>> > > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > > benefit > > > > >>>>>> is > > > > >>>>>> that we don't need to change the DataStream API directly, but > > put > > > a > > > > >>>>>> > > > > >>>>> library > > > > >>>>> > > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > > approach. > > > > >>>>>> > > > > >>>>> > > > > >>>>> We will provide a DSL which will do jsut this. But even > without > > > the > > > > >>>>> DSL > > > > >>>>> this is what we do with low level joins. > > > > >>>>> > > > > >>>>> > > > > >>>>> 5) > > > > >>>>> > > > > >>>>> I'm skeptical about using queryable state to expose metrics. > Did > > > you > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > configurable > > > > >>>>>> > > > > >>>>> and we > > > > >>>>> > > > > >>>>>> provided several reporters that export the metrics. > > > > >>>>>> > > > > >>>>>> This of course is an option. The choice of queryable state was > > > > mostly > > > > >>>>> driven by a simplicity of real time integration. Any reason > why > > > > >>>>> metrics > > > > >>>>> system is netter? > > > > >>>>> > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Stavros > > > > >>>>> > > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > > [hidden email]> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>> Hi Stavros, > > > > >>>>>> > > > > >>>>>> thanks for the detailed FLIP! > > > > >>>>>> Model serving is an important use case and it's great to see > > > efforts > > > > >>>>>> > > > > >>>>> to add > > > > >>>>> > > > > >>>>>> a library for this to Flink! > > > > >>>>>> > > > > >>>>>> I've read the FLIP and would like to ask a few questions and > > make > > > > some > > > > >>>>>> suggestions. > > > > >>>>>> > > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able > to > > > > >>>>>> handle > > > > >>>>>> different input types? > > > > >>>>>> I understand that it makes sense to have different models for > > > > >>>>>> different > > > > >>>>>> instances of the same type, i.e., same data type but different > > > keys. > > > > >>>>>> > > > > >>>>> Hence, > > > > >>>>> > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > completely > > > > >>>>>> different types be handled by different ML pipelines or would > > > there > > > > be > > > > >>>>>> major drawbacks? > > > > >>>>>> > > > > >>>>>> 2) I think from an API point of view it would be better to not > > > > require > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > > model > > > > >>>>>> > > > > >>>>> server > > > > >>>>> > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > necessary) > > > > >>>>>> > > > > >>>>> convert > > > > >>>>> > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > support > > > > >>>>>> > > > > >>>>> different > > > > >>>>> > > > > >>>>>> types of records (see my first point), we can introduce a > Union > > > type > > > > >>>>>> > > > > >>>>> (i.e., > > > > >>>>> > > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > > encoding > > > > >>>>>> format for the models but maybe also this can be designed to > be > > > > >>>>>> > > > > >>>>> pluggable > > > > >>>>> > > > > >>>>>> such that later other encodings can be added. > > > > >>>>>> > > > > >>>>>> 3) I think the DataStream Java API should be supported as a > > first > > > > >>>>>> class > > > > >>>>>> citizens for this library. > > > > >>>>>> > > > > >>>>>> 4) For the integration with the DataStream API, we could > provide > > > an > > > > >>>>>> API > > > > >>>>>> that receives (typed) DataStream objects, internally > constructs > > > the > > > > >>>>>> DataStream operators, and returns one (or more) result > > > DataStreams. > > > > >>>>>> The > > > > >>>>>> benefit is that we don't need to change the DataStream API > > > directly, > > > > >>>>>> > > > > >>>>> but > > > > >>>>> > > > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > > > follow > > > > >>>>>> > > > > >>>>> this > > > > >>>>> > > > > >>>>>> approach. > > > > >>>>>> > > > > >>>>>> 5) I'm skeptical about using queryable state to expose > metrics. > > > Did > > > > >>>>>> you > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > configurable > > > > >>>>>> > > > > >>>>> and we > > > > >>>>> > > > > >>>>>> provided several reporters that export the metrics. > > > > >>>>>> > > > > >>>>>> What do you think? > > > > >>>>>> Best, Fabian > > > > >>>>>> > > > > >>>>>> [1] > > > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > > >>>>>> > > > > >>>>> monitoring/ > > > > >>>>> > > > > >>>>>> metrics.html > > > > >>>>>> > > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > > >>>>>> > > > > >>>>> [hidden email]>: > > > > >>>>> > > > > >>>>>> Hi guys, > > > > >>>>>>> > > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > > Flink. > > > > The > > > > >>>>>>> > > > > >>>>>> idea > > > > >>>>>> > > > > >>>>>>> is to combine previous efforts there and provide a library on > > top > > > > of > > > > >>>>>>> > > > > >>>>>> Flink > > > > >>>>>> > > > > >>>>>>> for serving models. > > > > >>>>>>> > > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > >>>>>>> > > > > >>>>>> 23+-+Model+Serving > > > > >>>>>> > > > > >>>>>>> Code from previous efforts can be found here: > > > > >>>>>>> > > > > >>>>>> https://github.com/FlinkML > > > > >>>>> > > > > >>>>>> Best, > > > > >>>>>>> Stavros > > > > >>>>>>> > > > > >>>>>>> > > > > >>>> > > > > >>>> > > > > > > > > > > > > > > > |
Hi Robert, Boris,
Sorry for the late reply. I took some time to look at the implementation. I think it looks good overall to me. Since this is pretty much a big PR. I would like to raise a bit of discussion beforehand: 1. In order to simplify the PR complexity, can we do the following? - the example in a separated PR - either the Java or Scala implementation of the model-serving module into a separated PR (they look pretty much similar to me) This way it makes the PR more clean and readable. 2. I think the idea to carve out this implementation into: - model layer; - serving layer; - queryable state API is a very good idea! On a higher level, this should enable usage of any model in serving perspective, so the question is: does DataConverter / Model conformed with Ski-learn's pipeline definition (My read on this is: yes). My understanding is this is generic enough to support any "Model", for example the model defined in FLIP-39, but it might be a good idea for the authors to also take a look (CCed: @weihua, @shaoxuan) 3. Implementation - following up with #1, can we move the architecture to - model-server-base - model-server-java/scala //specific to each language There are many duplicate codes between the Java and Scala implementations that I think might be able to put them as a base, for example, implemented in Scala (since most of the basic math / ML packages[1][2] are all implemented Scala-based). - Type system of this PR does not use too much of Flink's type information (other than the ByteArray, which is a pass-through), maybe we can leverage this for the strong-type guarantee on the data processor side. (CCed @timo who might have some idea regarding the type system in a whole, as he is working on the table type system FLIP-37). My thoughts is, the model itself is protobuf thus must be in ByteArray, but some auxiliary data (stats, additional data type, record type, etc) can use Flink's type system. I will finish the review and add comments to the PR asap. Thanks, Rong [1] https://github.com/scalanlp/breeze [2] https://spark.apache.org/docs/latest/ml-guide.html On Tue, Apr 30, 2019 at 2:33 AM Robert Metzger <[hidden email]> wrote: > Hey all, > > I'm wondering if somebody on the list can take a look at the PR from > FLIP-23: https://github.com/apache/flink/pull/7446 > > > On Mon, Oct 1, 2018 at 6:13 PM Rong Rong <[hidden email]> wrote: > > > Thanks for the contribution Boris!! I've been playing around with the > basic > > model for a while back and loved it. > > +1 and really looking forward to having the feature merging back to Flink > > ML. > > > > -- > > Rong > > > > On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <[hidden email]> wrote: > > > > > Hi everybody, > > > > > > The question of how to serve ML models in Flink applications came up in > > > several conversations I had with Flink users in the last months. > > > Recently, Boris approached me and he told me that he'd like to revive > the > > > efforts around FLIP-23 [1]. > > > > > > In the last days, Boris extended the proposal by a speculative model > > > evaluation which allows for evaluating multiple modes of varying > > complexity > > > to ensure certain SLAs. > > > The code does already exist in a Github repository [2]. > > > > > > Due to the frequent user requests and the fact that the code is already > > > present, I think would be a great feature for Flink to have. > > > Since this is a library on top of Flink's existing APIs this should not > > be > > > too hard to review. > > > > > > What do others think? > > > > > > Best, Fabian > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving > > > [2] https://github.com/FlinkML/flink-speculative-modelServer > > > > > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > > > [hidden email]>: > > > > > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > > > I agree there are pros and cons. > > > > > > > > Best, > > > > Stavros > > > > > > > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email]> > > > wrote: > > > > > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > > > I'll move it to the ACCEPTED status. > > > > > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > > > I agree with Chesnay that metrics should be primarily exposed via > the > > > > > metrics system. > > > > > Storing them in state makes them fault-tolerant and queryable if > the > > > > state > > > > > is properly configured. > > > > > > > > > > Thanks, > > > > > Fabian > > > > > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email]>: > > > > > > > > > > > I'm currently looking over it, but one thing that stood out was > > that > > > > the > > > > > > FLIP proposes to use queryable state > > > > > > as a monitoring solution. Given that we have a metric system that > > > > > > integrates with plenty of commonly used > > > > > > metric backends this doesn't really make sense to me. > > > > > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > > > though, > > > > > > since this is something that the metric > > > > > > system doesn't provide by itself. > > > > > > > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > > > > > >> Are there any more comments on the FLIP? > > > > > >> > > > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs > [1] > > > and > > > > > >> continue with the implementation. > > > > > >> > > > > > >> Also, is there a committer who'd like to shepherd the FLIP and > > > review > > > > > the > > > > > >> corresponding PRs? > > > > > >> Of course, everybody is welcome to review the code but we need > at > > > > least > > > > > >> one > > > > > >> committer who will eventually merge the changes. > > > > > >> > > > > > >> Best, > > > > > >> Fabian > > > > > >> > > > > > >> [1] > > > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ > > > > > >> Improvement+Proposals > > > > > >> > > > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email]>: > > > > > >> > > > > > >> Hi, > > > > > >>> > > > > > >>> Sorry for the late follow up. > > > > > >>> > > > > > >>> I think I understand the motivation for choosing ProtoBuf as > the > > > > > >>> representation and serialization format and this makes sense to > > me. > > > > > >>> > > > > > >>> However, it might be a good idea to provide tooling to convert > > > Flink > > > > > >>> types > > > > > >>> (described as TypeInformation) to ProtoBuf. > > > > > >>> Otherwise, users of the model serving library would need to > > > manually > > > > > >>> convert their data types (say Scala tuples, case classes, or > Avro > > > > > Pojos) > > > > > >>> to > > > > > >>> ProtoBuf messages. > > > > > >>> I don't think that this needs to be included in the first > version > > > but > > > > > it > > > > > >>> might be a good extension to make the library easier to use. > > > > > >>> > > > > > >>> Best, > > > > > >>> Fabian > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > > > >>> [hidden email]> > > > > > >>> : > > > > > >>> > > > > > >>> Thanks Fabian, > > > > > >>>> More below > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> Boris Lublinsky > > > > > >>>> FDP Architect > > > > > >>>> [hidden email] > > > > > >>>> https://www.lightbend.com/ > > > > > >>>> > > > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email] > > > > > > wrote: > > > > > >>>> > > > > > >>>> Hi Boris and Stavros, > > > > > >>>> > > > > > >>>> Thanks for the responses. > > > > > >>>> > > > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood > this > > > > part > > > > > of > > > > > >>>> the proposal. > > > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > > > encoding > > > > > >>>> ("ability > > > > > >>>> to represent different data types") such that different a > model > > > > > pipeline > > > > > >>>> should work on different data types. > > > > > >>>> I agree that it should be possible to give records of the same > > > type > > > > > (but > > > > > >>>> with different keys) to different models. The key-based join > > > > approach > > > > > >>>> looks > > > > > >>>> good to me. > > > > > >>>> > > > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > > > models > > > > > >>>> for > > > > > >>>> the given reasons. > > > > > >>>> However, the choice of ProtoBuf serialization for the records > > > might > > > > > make > > > > > >>>> the integration with existing libraries and also regular > > > DataStream > > > > > >>>> programs more difficult. > > > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > > > deserialize > > > > > >>>> records by default. Hence, we would need to add a conversion > > step > > > > > before > > > > > >>>> records can be passed to a model serving operator. > > > > > >>>> Are you expecting some common format that all records follow > > (such > > > > as > > > > > a > > > > > >>>> Row or Vector type) or do you plan to support arbitrary > records > > > such > > > > > as > > > > > >>>> Pojos? > > > > > >>>> If you plan for a specific type, you could add a > TypeInformation > > > for > > > > > >>>> this > > > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > > > >>>> > > > > > >>>> The way I look at it is slightly different. The common format > > for > > > > > >>>> records, supported by Flink, is Byte array with a little bit > of > > > > > header, > > > > > >>>> describing data type and is used for routing. The actual > > > > unmarshalling > > > > > >>>> is > > > > > >>>> done by the model implementation itself. This provides the > > maximum > > > > > >>>> flexibility and gives user the freedom to create his own types > > > > without > > > > > >>>> breaking underlying framework. > > > > > >>>> > > > > > >>>> Ad 4) @Boris: I made this point not about the serialization > > format > > > > but > > > > > >>>> how the library would integrate with Flink's DataStream API. > > > > > >>>> I thought I had seen a code snippet that showed a new method > on > > > the > > > > > >>>> DataStream object but cannot find this anymore. > > > > > >>>> So, I just wanted to make the point that we should not change > > the > > > > > >>>> DataStream API (unless it lacks support for some features) and > > > built > > > > > the > > > > > >>>> model serving library on top of it. > > > > > >>>> But I get from Stavros answer that this is your design anyway. > > > > > >>>> > > > > > >>>> Ad 5) The metrics system is the default way to expose system > and > > > job > > > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > > > various > > > > > >>>> reporters, they can be easily integrated in many production > > > > > >>>> environments. > > > > > >>>> A solution based on queryable state will always need custom > code > > > to > > > > > >>>> access the information. Of course this can be an optional > > feature. > > > > > >>>> > > > > > >>>> What do others think about this proposal? > > > > > >>>> > > > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, > but > > > you > > > > > are > > > > > >>>> the first one outside of it. My book > https://www.lightbend.com > > > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > > > ebook-from-lightbend > > > > > >>>> has > > > > > >>>> > > > > > >>>> a reasonably good reviews, so we are hoping this will work > > > > > >>>> > > > > > >>>> > > > > > >>>> Best, Fabian > > > > > >>>> > > > > > >>>> > > > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > > > >>>> [hidden email]> > > > > > >>>> : > > > > > >>>> > > > > > >>>> Hi Fabian thanx! > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able > > to > > > > > handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models > for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but > different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or > would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> Could you elaborate more on this? Right now we only use keys > > when > > > > we > > > > > do > > > > > >>>>> the > > > > > >>>>> join. A given pipeline can handle only a well defined type > (the > > > > type > > > > > >>>>> can > > > > > >>>>> be > > > > > >>>>> a simple string with a custom value, no need to be a > > > > > >>>>> class type) which serves as a key. > > > > > >>>>> > > > > > >>>>> 2) > > > > > >>>>> > > > > > >>>>> I think from an API point of view it would be better to not > > > require > > > > > >>>>> > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, > the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of > binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> We do uses scala classes (strongly typed classes), > protobuf > > is > > > > > only > > > > > >>>>> used > > > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > > > size, > > > > > >>>>> expressiveness and ability to represent different data types. > > > > > >>>>> > > > > > >>>>> 3) > > > > > >>>>> > > > > > >>>>> I think the DataStream Java API should be supported as a > first > > > > class > > > > > >>>>> > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> I agree. It should be either first priority or a next thing > to > > > do. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 4) > > > > > >>>>> > > > > > >>>>> For the integration with the DataStream API, we could provide > > an > > > > API > > > > > >>>>> that > > > > > >>>>> > > > > > >>>>>> receives (typed) DataStream objects, internally constructs > the > > > > > >>>>>> > > > > > >>>>> DataStream > > > > > >>>>> > > > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > > > benefit > > > > > >>>>>> is > > > > > >>>>>> that we don't need to change the DataStream API directly, > but > > > put > > > > a > > > > > >>>>>> > > > > > >>>>> library > > > > > >>>>> > > > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > > > approach. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> We will provide a DSL which will do jsut this. But even > > without > > > > the > > > > > >>>>> DSL > > > > > >>>>> this is what we do with low level joins. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 5) > > > > > >>>>> > > > > > >>>>> I'm skeptical about using queryable state to expose metrics. > > Did > > > > you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> This of course is an option. The choice of queryable state > was > > > > > mostly > > > > > >>>>> driven by a simplicity of real time integration. Any reason > > why > > > > > >>>>> metrics > > > > > >>>>> system is netter? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Stavros > > > > > >>>>> > > > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > > > [hidden email]> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>> Hi Stavros, > > > > > >>>>>> > > > > > >>>>>> thanks for the detailed FLIP! > > > > > >>>>>> Model serving is an important use case and it's great to see > > > > efforts > > > > > >>>>>> > > > > > >>>>> to add > > > > > >>>>> > > > > > >>>>>> a library for this to Flink! > > > > > >>>>>> > > > > > >>>>>> I've read the FLIP and would like to ask a few questions and > > > make > > > > > some > > > > > >>>>>> suggestions. > > > > > >>>>>> > > > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be > able > > to > > > > > >>>>>> handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models > for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but > different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or > would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>>> 2) I think from an API point of view it would be better to > not > > > > > require > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, > the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of > binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> 3) I think the DataStream Java API should be supported as a > > > first > > > > > >>>>>> class > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>>> 4) For the integration with the DataStream API, we could > > provide > > > > an > > > > > >>>>>> API > > > > > >>>>>> that receives (typed) DataStream objects, internally > > constructs > > > > the > > > > > >>>>>> DataStream operators, and returns one (or more) result > > > > DataStreams. > > > > > >>>>>> The > > > > > >>>>>> benefit is that we don't need to change the DataStream API > > > > directly, > > > > > >>>>>> > > > > > >>>>> but > > > > > >>>>> > > > > > >>>>>> put a library on top. The other libraries (CEP, Table, > Gelly) > > > > follow > > > > > >>>>>> > > > > > >>>>> this > > > > > >>>>> > > > > > >>>>>> approach. > > > > > >>>>>> > > > > > >>>>>> 5) I'm skeptical about using queryable state to expose > > metrics. > > > > Did > > > > > >>>>>> you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> What do you think? > > > > > >>>>>> Best, Fabian > > > > > >>>>>> > > > > > >>>>>> [1] > > > > > >>>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > > > > > >>>>>> > > > > > >>>>> monitoring/ > > > > > >>>>> > > > > > >>>>>> metrics.html > > > > > >>>>>> > > > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > > > >>>>>> > > > > > >>>>> [hidden email]>: > > > > > >>>>> > > > > > >>>>>> Hi guys, > > > > > >>>>>>> > > > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > > > Flink. > > > > > The > > > > > >>>>>>> > > > > > >>>>>> idea > > > > > >>>>>> > > > > > >>>>>>> is to combine previous efforts there and provide a library > on > > > top > > > > > of > > > > > >>>>>>> > > > > > >>>>>> Flink > > > > > >>>>>> > > > > > >>>>>>> for serving models. > > > > > >>>>>>> > > > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > > > >>>>>>> > > > > > >>>>>> 23+-+Model+Serving > > > > > >>>>>> > > > > > >>>>>>> Code from previous efforts can be found here: > > > > > >>>>>>> > > > > > >>>>>> https://github.com/FlinkML > > > > > >>>>> > > > > > >>>>>> Best, > > > > > >>>>>>> Stavros > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>> > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > |
Thanks Rong, really appreciate review.
See some answers below Boris Lublinsky FDP Architect [hidden email] https://www.lightbend.com/ > On May 5, 2019, at 7:52 PM, Rong Rong <[hidden email]> wrote: > > Hi Robert, Boris, > > Sorry for the late reply. I took some time to look at the implementation. I think it looks good overall to me. > Since this is pretty much a big PR. I would like to raise a bit of discussion beforehand: > > 1. In order to simplify the PR complexity, can we do the following? > - the example in a separated PR > - either the Java or Scala implementation of the model-serving module into a separated PR (they look pretty much similar to me) > This way it makes the PR more clean and readable. Java and Scala implementations provide identical functionality, so they can be separated. I like Scala implementation more, because it looks slightly cleaner. I added Java as an afterthought, because the majority of Flink code is Java, but it is fine to separate the two. I think the example is essential here, because it shows how to use library. > > 2. I think the idea to carve out this implementation into: > - model layer; > - serving layer; > - queryable state API This is effectively the way code is currently structured. I am not sure separating this into 3 independent PR will make a lot of sense. They are just building blocks, collectively implementing the library. > is a very good idea! On a higher level, this should enable usage of any model in serving perspective, so the question is: does DataConverter / Model conformed with Ski-learn's pipeline definition (My read on this is: yes). My understanding is this is generic enough to support any "Model", for example the model defined in FLIP-39, but it might be a good idea for the authors to also take a look (CCed: @weihua, @shaoxuan) The basic idea of the implementation is model as data. As long as the model can be defined in the intermediate format, whether it is Tensorflow, PMML, PFA, ONNX and there is a “generic” model evaluator for this representation, this will work. Take a look at the package https://github.com/blublinsky/flink/tree/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/model <https://github.com/blublinsky/flink/tree/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/model>. It is completely generic, while https://github.com/blublinsky/flink/tree/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/model/tensorflow <https://github.com/blublinsky/flink/tree/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/model/tensorflow> adds tensor flow specific support. The generic implementation treats model as a byte array, while model specific code knows how this byte array should be processed for a specific model type. > > 3. Implementation > - following up with #1, can we move the architecture to > - model-server-base > - model-server-java/scala //specific to each language I was thinking about it and it is doable, but pretty ugly. Using Java library from Scala works, but it is not very pretty. Putting Java APIs on Scala code is even uglier. > There are many duplicate codes between the Java and Scala implementations that I think might be able to put them as a base, for example, implemented in Scala (since most of the basic math / ML packages[1][2] are all implemented Scala-based). > > - Type system of this PR does not use too much of Flink's type information (other than the ByteArray, which is a pass-through), maybe we can leverage this for the strong-type guarantee on the data processor side. (CCed @timo who might have some idea regarding the type system in a whole, as he is working on the table type system FLIP-37). My thoughts is, the model itself is protobuf thus must be in ByteArray, but some auxiliary data (stats, additional data type, record type, etc) can use Flink's type system. Implementation effectively implements Flink type system for the model. See https://github.com/blublinsky/flink/blob/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/server/typeschema/ModelTypeSerializer.scala <https://github.com/blublinsky/flink/blob/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/server/typeschema/ModelTypeSerializer.scala> and https://github.com/blublinsky/flink/blob/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/server/typeschema/ModelWithTypeSerializer.scala <https://github.com/blublinsky/flink/blob/master/flink-modelserving/flink-modelserving-scala/src/main/scala/org/apache/flink/modelserving/scala/server/typeschema/ModelWithTypeSerializer.scala> so I am not sure how much more Flink types can be used there. > > I will finish the review and add comments to the PR asap. > > Thanks, > Rong > > [1] https://github.com/scalanlp/breeze <https://github.com/scalanlp/breeze> > [2] https://spark.apache.org/docs/latest/ml-guide.html <https://spark.apache.org/docs/latest/ml-guide.html> > On Tue, Apr 30, 2019 at 2:33 AM Robert Metzger <[hidden email] <mailto:[hidden email]>> wrote: > Hey all, > > I'm wondering if somebody on the list can take a look at the PR from > FLIP-23: https://github.com/apache/flink/pull/7446 <https://github.com/apache/flink/pull/7446> > > > On Mon, Oct 1, 2018 at 6:13 PM Rong Rong <[hidden email] <mailto:[hidden email]>> wrote: > > > Thanks for the contribution Boris!! I've been playing around with the basic > > model for a while back and loved it. > > +1 and really looking forward to having the feature merging back to Flink > > ML. > > > > -- > > Rong > > > > On Mon, Oct 1, 2018 at 7:55 AM Fabian Hueske <[hidden email] <mailto:[hidden email]>> wrote: > > > > > Hi everybody, > > > > > > The question of how to serve ML models in Flink applications came up in > > > several conversations I had with Flink users in the last months. > > > Recently, Boris approached me and he told me that he'd like to revive the > > > efforts around FLIP-23 [1]. > > > > > > In the last days, Boris extended the proposal by a speculative model > > > evaluation which allows for evaluating multiple modes of varying > > complexity > > > to ensure certain SLAs. > > > The code does already exist in a Github repository [2]. > > > > > > Due to the frequent user requests and the fact that the code is already > > > present, I think would be a great feature for Flink to have. > > > Since this is a library on top of Flink's existing APIs this should not > > be > > > too hard to review. > > > > > > What do others think? > > > > > > Best, Fabian > > > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving <https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving> > > > [2] https://github.com/FlinkML/flink-speculative-modelServer <https://github.com/FlinkML/flink-speculative-modelServer> > > > > > > Am Mo., 5. Feb. 2018 um 13:11 Uhr schrieb Stavros Kontopoulos < > > > [hidden email] <mailto:[hidden email]>>: > > > > > > > Thanx @Fabian. I will update the document accordingly wrt metrics. > > > > I agree there are pros and cons. > > > > > > > > Best, > > > > Stavros > > > > > > > > > > > > On Wed, Jan 31, 2018 at 1:07 AM, Fabian Hueske <[hidden email] <mailto:[hidden email]>> > > > wrote: > > > > > > > > > OK, I think there was plenty of time to comment on this FLIP. > > > > > I'll move it to the ACCEPTED status. > > > > > > > > > > @Stavros, please consider the feedback regarding the metrics. > > > > > I agree with Chesnay that metrics should be primarily exposed via the > > > > > metrics system. > > > > > Storing them in state makes them fault-tolerant and queryable if the > > > > state > > > > > is properly configured. > > > > > > > > > > Thanks, > > > > > Fabian > > > > > > > > > > 2018-01-22 17:19 GMT+01:00 Chesnay Schepler <[hidden email] <mailto:[hidden email]>>: > > > > > > > > > > > I'm currently looking over it, but one thing that stood out was > > that > > > > the > > > > > > FLIP proposes to use queryable state > > > > > > as a monitoring solution. Given that we have a metric system that > > > > > > integrates with plenty of commonly used > > > > > > metric backends this doesn't really make sense to me. > > > > > > > > > > > > Storing them in state still has value in terms of fault-tolerance > > > > though, > > > > > > since this is something that the metric > > > > > > system doesn't provide by itself. > > > > > > > > > > > > > > > > > > On 18.01.2018 13:57, Fabian Hueske wrote: > > > > > > > > > > > >> Are there any more comments on the FLIP? > > > > > >> > > > > > >> Otherwise, I'd suggest to move the FLIP to the accepted FLIPs [1] > > > and > > > > > >> continue with the implementation. > > > > > >> > > > > > >> Also, is there a committer who'd like to shepherd the FLIP and > > > review > > > > > the > > > > > >> corresponding PRs? > > > > > >> Of course, everybody is welcome to review the code but we need at > > > > least > > > > > >> one > > > > > >> committer who will eventually merge the changes. > > > > > >> > > > > > >> Best, > > > > > >> Fabian > > > > > >> > > > > > >> [1] > > > > > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+ <https://cwiki.apache.org/confluence/display/FLINK/Flink+> > > > > > >> Improvement+Proposals > > > > > >> > > > > > >> 2017-12-04 10:54 GMT+01:00 Fabian Hueske <[hidden email] <mailto:[hidden email]>>: > > > > > >> > > > > > >> Hi, > > > > > >>> > > > > > >>> Sorry for the late follow up. > > > > > >>> > > > > > >>> I think I understand the motivation for choosing ProtoBuf as the > > > > > >>> representation and serialization format and this makes sense to > > me. > > > > > >>> > > > > > >>> However, it might be a good idea to provide tooling to convert > > > Flink > > > > > >>> types > > > > > >>> (described as TypeInformation) to ProtoBuf. > > > > > >>> Otherwise, users of the model serving library would need to > > > manually > > > > > >>> convert their data types (say Scala tuples, case classes, or Avro > > > > > Pojos) > > > > > >>> to > > > > > >>> ProtoBuf messages. > > > > > >>> I don't think that this needs to be included in the first version > > > but > > > > > it > > > > > >>> might be a good extension to make the library easier to use. > > > > > >>> > > > > > >>> Best, > > > > > >>> Fabian > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> 2017-11-28 17:22 GMT+01:00 Boris Lublinsky < > > > > > >>> [hidden email] <mailto:[hidden email]>> > > > > > >>> : > > > > > >>> > > > > > >>> Thanks Fabian, > > > > > >>>> More below > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> Boris Lublinsky > > > > > >>>> FDP Architect > > > > > >>>> [hidden email] <mailto:[hidden email]> > > > > > >>>> https://www.lightbend.com/ <https://www.lightbend.com/> > > > > > >>>> > > > > > >>>> On Nov 28, 2017, at 8:21 AM, Fabian Hueske <[hidden email] <mailto:[hidden email]>> > > > > wrote: > > > > > >>>> > > > > > >>>> Hi Boris and Stavros, > > > > > >>>> > > > > > >>>> Thanks for the responses. > > > > > >>>> > > > > > >>>> Ad 1) Thanks for the clarification. I think I misunderstood this > > > > part > > > > > of > > > > > >>>> the proposal. > > > > > >>>> I interpreted the argument why to chose ProtoBuf for network > > > > encoding > > > > > >>>> ("ability > > > > > >>>> to represent different data types") such that different a model > > > > > pipeline > > > > > >>>> should work on different data types. > > > > > >>>> I agree that it should be possible to give records of the same > > > type > > > > > (but > > > > > >>>> with different keys) to different models. The key-based join > > > > approach > > > > > >>>> looks > > > > > >>>> good to me. > > > > > >>>> > > > > > >>>> Ad 2) I understand that ProtoBuf is a good choice to serialize > > > > models > > > > > >>>> for > > > > > >>>> the given reasons. > > > > > >>>> However, the choice of ProtoBuf serialization for the records > > > might > > > > > make > > > > > >>>> the integration with existing libraries and also regular > > > DataStream > > > > > >>>> programs more difficult. > > > > > >>>> They all use Flink's TypeSerializer system to serialize and > > > > > deserialize > > > > > >>>> records by default. Hence, we would need to add a conversion > > step > > > > > before > > > > > >>>> records can be passed to a model serving operator. > > > > > >>>> Are you expecting some common format that all records follow > > (such > > > > as > > > > > a > > > > > >>>> Row or Vector type) or do you plan to support arbitrary records > > > such > > > > > as > > > > > >>>> Pojos? > > > > > >>>> If you plan for a specific type, you could add a TypeInformation > > > for > > > > > >>>> this > > > > > >>>> type with a TypeSerializer that is based on ProtoBuf. > > > > > >>>> > > > > > >>>> The way I look at it is slightly different. The common format > > for > > > > > >>>> records, supported by Flink, is Byte array with a little bit of > > > > > header, > > > > > >>>> describing data type and is used for routing. The actual > > > > unmarshalling > > > > > >>>> is > > > > > >>>> done by the model implementation itself. This provides the > > maximum > > > > > >>>> flexibility and gives user the freedom to create his own types > > > > without > > > > > >>>> breaking underlying framework. > > > > > >>>> > > > > > >>>> Ad 4) @Boris: I made this point not about the serialization > > format > > > > but > > > > > >>>> how the library would integrate with Flink's DataStream API. > > > > > >>>> I thought I had seen a code snippet that showed a new method on > > > the > > > > > >>>> DataStream object but cannot find this anymore. > > > > > >>>> So, I just wanted to make the point that we should not change > > the > > > > > >>>> DataStream API (unless it lacks support for some features) and > > > built > > > > > the > > > > > >>>> model serving library on top of it. > > > > > >>>> But I get from Stavros answer that this is your design anyway. > > > > > >>>> > > > > > >>>> Ad 5) The metrics system is the default way to expose system and > > > job > > > > > >>>> metrics in Flink. Due to the pluggable reporter interface and > > > > various > > > > > >>>> reporters, they can be easily integrated in many production > > > > > >>>> environments. > > > > > >>>> A solution based on queryable state will always need custom code > > > to > > > > > >>>> access the information. Of course this can be an optional > > feature. > > > > > >>>> > > > > > >>>> What do others think about this proposal? > > > > > >>>> > > > > > >>>> We had agreement among work group - Eron, Bas, Andrea, etc, but > > > you > > > > > are > > > > > >>>> the first one outside of it. My book https://www.lightbend.com <https://www.lightbend.com/> > > > > > >>>> /blog/serving-machine-learning-models-free-oreilly- > > > > > ebook-from-lightbend > > > > > >>>> has > > > > > >>>> > > > > > >>>> a reasonably good reviews, so we are hoping this will work > > > > > >>>> > > > > > >>>> > > > > > >>>> Best, Fabian > > > > > >>>> > > > > > >>>> > > > > > >>>> 2017-11-28 13:53 GMT+01:00 Stavros Kontopoulos < > > > > > >>>> [hidden email] <mailto:[hidden email]>> > > > > > >>>> : > > > > > >>>> > > > > > >>>> Hi Fabian thanx! > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 1) Is it a strict requirement that a ML pipeline must be able > > to > > > > > handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> Could you elaborate more on this? Right now we only use keys > > when > > > > we > > > > > do > > > > > >>>>> the > > > > > >>>>> join. A given pipeline can handle only a well defined type (the > > > > type > > > > > >>>>> can > > > > > >>>>> be > > > > > >>>>> a simple string with a custom value, no need to be a > > > > > >>>>> class type) which serves as a key. > > > > > >>>>> > > > > > >>>>> 2) > > > > > >>>>> > > > > > >>>>> I think from an API point of view it would be better to not > > > require > > > > > >>>>> > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> We do uses scala classes (strongly typed classes), protobuf > > is > > > > > only > > > > > >>>>> used > > > > > >>>>> on the wire. For on the wire encoding we prefer protobufs for > > > size, > > > > > >>>>> expressiveness and ability to represent different data types. > > > > > >>>>> > > > > > >>>>> 3) > > > > > >>>>> > > > > > >>>>> I think the DataStream Java API should be supported as a first > > > > class > > > > > >>>>> > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> I agree. It should be either first priority or a next thing to > > > do. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 4) > > > > > >>>>> > > > > > >>>>> For the integration with the DataStream API, we could provide > > an > > > > API > > > > > >>>>> that > > > > > >>>>> > > > > > >>>>>> receives (typed) DataStream objects, internally constructs the > > > > > >>>>>> > > > > > >>>>> DataStream > > > > > >>>>> > > > > > >>>>>> operators, and returns one (or more) result DataStreams. The > > > > benefit > > > > > >>>>>> is > > > > > >>>>>> that we don't need to change the DataStream API directly, but > > > put > > > > a > > > > > >>>>>> > > > > > >>>>> library > > > > > >>>>> > > > > > >>>>>> on top. The other libraries (CEP, Table, Gelly) follow this > > > > > approach. > > > > > >>>>>> > > > > > >>>>> > > > > > >>>>> We will provide a DSL which will do jsut this. But even > > without > > > > the > > > > > >>>>> DSL > > > > > >>>>> this is what we do with low level joins. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 5) > > > > > >>>>> > > > > > >>>>> I'm skeptical about using queryable state to expose metrics. > > Did > > > > you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> This of course is an option. The choice of queryable state was > > > > > mostly > > > > > >>>>> driven by a simplicity of real time integration. Any reason > > why > > > > > >>>>> metrics > > > > > >>>>> system is netter? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> Best, > > > > > >>>>> Stavros > > > > > >>>>> > > > > > >>>>> On Mon, Nov 27, 2017 at 4:23 PM, Fabian Hueske < > > > [hidden email] <mailto:[hidden email]>> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>> Hi Stavros, > > > > > >>>>>> > > > > > >>>>>> thanks for the detailed FLIP! > > > > > >>>>>> Model serving is an important use case and it's great to see > > > > efforts > > > > > >>>>>> > > > > > >>>>> to add > > > > > >>>>> > > > > > >>>>>> a library for this to Flink! > > > > > >>>>>> > > > > > >>>>>> I've read the FLIP and would like to ask a few questions and > > > make > > > > > some > > > > > >>>>>> suggestions. > > > > > >>>>>> > > > > > >>>>>> 1) Is it a strict requirement that a ML pipeline must be able > > to > > > > > >>>>>> handle > > > > > >>>>>> different input types? > > > > > >>>>>> I understand that it makes sense to have different models for > > > > > >>>>>> different > > > > > >>>>>> instances of the same type, i.e., same data type but different > > > > keys. > > > > > >>>>>> > > > > > >>>>> Hence, > > > > > >>>>> > > > > > >>>>>> the key-based joins make sense to me. However, couldn't > > > completely > > > > > >>>>>> different types be handled by different ML pipelines or would > > > > there > > > > > be > > > > > >>>>>> major drawbacks? > > > > > >>>>>> > > > > > >>>>>> 2) I think from an API point of view it would be better to not > > > > > require > > > > > >>>>>> input records to be encoded as ProtoBuf messages. Instead, the > > > > model > > > > > >>>>>> > > > > > >>>>> server > > > > > >>>>> > > > > > >>>>>> could accept strongly-typed objects (Java/Scala) and (if > > > > necessary) > > > > > >>>>>> > > > > > >>>>> convert > > > > > >>>>> > > > > > >>>>>> them to ProtoBuf messages internally. In case we need to > > support > > > > > >>>>>> > > > > > >>>>> different > > > > > >>>>> > > > > > >>>>>> types of records (see my first point), we can introduce a > > Union > > > > type > > > > > >>>>>> > > > > > >>>>> (i.e., > > > > > >>>>> > > > > > >>>>>> an n-ary Either type). I see that we need some kind of binary > > > > > encoding > > > > > >>>>>> format for the models but maybe also this can be designed to > > be > > > > > >>>>>> > > > > > >>>>> pluggable > > > > > >>>>> > > > > > >>>>>> such that later other encodings can be added. > > > > > >>>>>> > > > > > >>>>>> 3) I think the DataStream Java API should be supported as a > > > first > > > > > >>>>>> class > > > > > >>>>>> citizens for this library. > > > > > >>>>>> > > > > > >>>>>> 4) For the integration with the DataStream API, we could > > provide > > > > an > > > > > >>>>>> API > > > > > >>>>>> that receives (typed) DataStream objects, internally > > constructs > > > > the > > > > > >>>>>> DataStream operators, and returns one (or more) result > > > > DataStreams. > > > > > >>>>>> The > > > > > >>>>>> benefit is that we don't need to change the DataStream API > > > > directly, > > > > > >>>>>> > > > > > >>>>> but > > > > > >>>>> > > > > > >>>>>> put a library on top. The other libraries (CEP, Table, Gelly) > > > > follow > > > > > >>>>>> > > > > > >>>>> this > > > > > >>>>> > > > > > >>>>>> approach. > > > > > >>>>>> > > > > > >>>>>> 5) I'm skeptical about using queryable state to expose > > metrics. > > > > Did > > > > > >>>>>> you > > > > > >>>>>> consider using Flink's metrics system [1]? It is easily > > > > configurable > > > > > >>>>>> > > > > > >>>>> and we > > > > > >>>>> > > > > > >>>>>> provided several reporters that export the metrics. > > > > > >>>>>> > > > > > >>>>>> What do you think? > > > > > >>>>>> Best, Fabian > > > > > >>>>>> > > > > > >>>>>> [1] > > > > > >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ <https://ci.apache.org/projects/flink/flink-docs-release-1.3/> > > > > > >>>>>> > > > > > >>>>> monitoring/ > > > > > >>>>> > > > > > >>>>>> metrics.html > > > > > >>>>>> > > > > > >>>>>> 2017-11-23 12:32 GMT+01:00 Stavros Kontopoulos < > > > > > >>>>>> > > > > > >>>>> [hidden email] <mailto:[hidden email]>>: > > > > > >>>>> > > > > > >>>>>> Hi guys, > > > > > >>>>>>> > > > > > >>>>>>> Let's discuss the new FLIP proposal for model serving over > > > Flink. > > > > > The > > > > > >>>>>>> > > > > > >>>>>> idea > > > > > >>>>>> > > > > > >>>>>>> is to combine previous efforts there and provide a library on > > > top > > > > > of > > > > > >>>>>>> > > > > > >>>>>> Flink > > > > > >>>>>> > > > > > >>>>>>> for serving models. > > > > > >>>>>>> > > > > > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- <https://cwiki.apache.org/confluence/display/FLINK/FLIP-> > > > > > >>>>>>> > > > > > >>>>>> 23+-+Model+Serving > > > > > >>>>>> > > > > > >>>>>>> Code from previous efforts can be found here: > > > > > >>>>>>> > > > > > >>>>>> https://github.com/FlinkML <https://github.com/FlinkML> > > > > > >>>>> > > > > > >>>>>> Best, > > > > > >>>>>>> Stavros > > > > > >>>>>>> > > > > > >>>>>>> > > > > > >>>> > > > > > >>>> > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |