[DISCUSS] FLIP-38 Support python language in flink TableAPI

classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

Stephan Ewen
Hi all!

Below are my notes on the discussion last week on how to collaborate
between Beam and Flink.
The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
Jincheng, and me.

This represents my understanding of the discussion, please augment this
where I missed something or where your conclusion was different.

Best,
Stephan

=======================================================

*Beams Python and Portability Framework*

  - Portability core to Beam
  - Language independent dataflow DAG that is defined via ProtoBuf
  - DAG can be generated from various languages (Java, Python, Go)
  - The DAG describes the pipelines and contains additional parameters to
describe each operator, and contains artifacts that need to be deployed /
executed as part of an operator execution.
  - Operators execute in language-specific containers, data is exchanged
between the language-specific container and the runner container (JVM) via
gRPC.

*Flink's desiderata for Python API*

  - Python API should mirror Java / Scala Table API
  - All relational expressions that correspond to built-in functions should
be translated to corresponding expressions in the Table API. That way the
planner generated Java code for the data types and built-in expressions,
meaning no Python code is necessary during execution
  - UDFs should be supported and run similarly as in Beam's approach
  - Python programs should be similarly created and submitted/deployed as
Java / Scala programs (CLI, web, containerized, etc.)

*Consensus to share inter-process communication code*

  - Crucial code for robust setup and high performance data exchange across
processes
  - The code for the SDK harness, the artifact boostrapping, and the data
exchange make sense to share.
  - Ongoing discussion whether this can be a dedicated module with slim
dependencies in Beam

*Potential Long Term Perspective: Share language-independent DAG
representation*

  - Beam's language independent DAG could become a standard representation
used in both projects
  - Flink would need an way to receive that DAG, map it to the Table API,
execute it from there
  - The DAG would need to have a standardized representation of functions
and expressions that then get mapped to Table API expressions to let the
planner optimize those and generate Java code for those
  - Similar as UDFs are supported in the Table API, there would be
additional "external UDFs" that would go through the above mentioned
inter-process communication layer

  - *Advantages:*
    => Flink and Beam could share more language bindings
    => Flink would execute Beam portability programs fast, without
intermediate abstraction and directly in the JVM for many operators.
         Abstraction is necessary around UDFs and to bridge between
serializers / coders, etc.

  - *Open issues:*
    => Biggest question is whether the language-independent DAG is
expressive enough to capture all the expressions that we want to map
directly to Table API expressions. Currently much is hidden in opaque UDFs.
Kenn mentioned the structure should be flexible enough to capture more
expressions transparently.

    => If the DAG is generic enough to capture the additional information,
we probably still need some standardization, so that all the different
language APIs represent their expressions the same way
    => Similarly, it makes sense to standardize the type system (and type
inference) as far as built-in expressions and their interaction with UDFs
are concerned. The Flink Table API and Blink teams found this to be
essential for a consistent API behavior. This would not prevent all-UDF
programs from still using purely binary/opaque types.

 =>  We need to create a Python API that follows the same structure as
Flink's Table API that produces the language-independent DAG

*Short-term approach in Flink*

  - Goal is to not block Flink's Python effort on the long term approach
and the necessary design and evolution of the language-independent DAG.
  - Depending on what the outcome of above investigation is, Flink may
initially go with a simple approach to map the Python Table API to the the
Java Table API via Py4J, as outlined in FLIP-38:
https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8



On Tue, Apr 23, 2019 at 4:14 AM jincheng sun <[hidden email]>
wrote:

> Hi everyone,
>
> Thank you for all of your feedback and comments in google doc!
>
> I have updated the google doc and add the UDFs part. For a short summary:
>
>   - Python TableAPI - Flink introduces a set of Python Table API Interfaces
> which align with Flink Java Table API. It uses Py4j framework to
> communicate between Python VM  and Java VM.
>   - Python User-defined functions - IMO. Flink supports the communication
> framework of UDFs, we will try to reuse the existing achievements of Beam
> as much as possible, and do our best for this. The first step is
>       to solve the above interface definition problem, which turns `
> WindowedValue<T>` into `T` in the FnDataService and BeamFnDataClient
> interface definition, has been discussed in the Beam community.
>
> The detail can be fonded here:
>
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
>
> So we can start the development of Table API without UDFs in Flink, and
> work with the Beam community to promote the abstraction of Beam.
>
> What do you think?
>
> Regards,
> Jincheng
>
> jincheng sun <[hidden email]> 于2019年4月17日周三 下午4:01写道:
>
> > Hi Stephan,
> >
> > Thanks for your suggestion and summarize. :)
> >
> >      ==> The FLIP should probably reflect the full goal rather than the
> >> first implementation step only, this would make sure everyone
> understands
> >> what the final goal of the effort is.
> >
> >
> > I totally agree that we can implement the function in stages, but FLIP
> > needs to reflect the full final goal. I agree with Thomas and you,  I
> will
> > add the design of the UDF part later.
> >
> > Yes, you are right, currently, we only consider the `flink run` and
> > `python-shell` as the job entry point. and we should add REST API for
> > another entry point.
> >
> > It would be super cool if the Python API would work seamlessly with all
> >> modes of starting Flink jobs.
> >
> >
> > If my understand you correctly, support Python TableAPI in Kubernetes, we
> > only need to increase (or improve the existing) REST API corresponding to
> > the Python Table API, of course, it also may need to release Docker Image
> > that supports Python, it will easily deploy Python TableAPI into
> > Kubernetes.
> >
> > So, Finally, we support the following ways to submit Python TableAPI:
> > - Python Shell - interactive development.
> > - CLI - submit the job by `flink run`. e.g: deploy job into the yarn
> > cluster.
> > - REST - submit the job by REST API. e.g: deploy job into the kubernetes
> > cluster.
> >
> > Please correct me if there are any incorrect understanding.
> >
> > Thanks,
> > Jincheng
> >
> >
> > Stephan Ewen <[hidden email]> 于2019年4月12日周五 上午12:22写道:
> >
> >> One more thought:
> >>
> >> The FLIP is very much centered on the CLI and it looks like it has
> mainly
> >> batch jobs and session clusters in mind.
> >>
> >> In very many cases, especially in streaming cases, the CLI (or shell) is
> >> not the entry point for a program.
> >> See for example the use of Flink jobs on Kubernetes (Container Mode /
> >> Entrypoint).
> >>
> >> It would be super cool if the Python API would work seamlessly with all
> >> modes of starting Flink jobs.
> >> That would make i available to all users.
> >>
> >> On Thu, Apr 11, 2019 at 5:34 PM Stephan Ewen <[hidden email]> wrote:
> >>
> >> > Hi all!
> >> >
> >> > I think that all the opinions and ideas are not actually in conflict,
> so
> >> > let me summarize what I understand is the proposal:
> >> >
> >> > *(1) Long-term goal: Full Python Table API with UDFs*
> >> >
> >> >      To break the implementation effort up into stages, the first step
> >> > would be the API without UDFs.
> >> >       Because of all the built-in functions in the Table API, this can
> >> > already exist by itself, with some value, but ultimately is quite
> >> limited
> >> > without UDF support.
> >> >
> >> >      ==> The FLIP should probably reflect the full goal rather than
> the
> >> > first implementation step only, this would make sure everyone
> >> understands
> >> > what the final goal of the effort is.
> >> >
> >> >
> >> > *(2) Relationship to Beam Language Portability*
> >> >
> >> > Flink's own Python Table API and Beam-Python on Flink add different
> >> value
> >> > and are both attractive for different scenarios.
> >> >
> >> >   - Beam's Python API supports complex pipelines in a similar style as
> >> the
> >> > DataStream API. There is also the ecosystem of libraries built on top
> >> that
> >> > DSL, for example for machine learning.
> >> >
> >> >   - Flink's Python Table API builds mostly relational expressions,
> plus
> >> > some UDFs. Most of the Python code never executes in Python, though.
> It
> >> is
> >> > geared at use cases similar to Flink's Table API.
> >> >
> >> > Both approaches mainly differ in how the streaming DAG is built from
> >> > Python code and received by the JVM.
> >> >
> >> > In previous discussions, we concluded that for inter process data
> >> exchange
> >> > (JVM <> Python), we want to share code with Beam.
> >> > That part is possibly the most crucial piece to getting performance
> out
> >> of
> >> > the Python DSL, so will benefit from sharing development,
> optimizations,
> >> > etc.
> >> >
> >> > Best,
> >> > Stephan
> >> >
> >> >
> >> >
> >> >
> >> > On Fri, Apr 5, 2019 at 5:25 PM jincheng sun <[hidden email]
> >
> >> > wrote:
> >> >
> >> >> One more thing It's better to mention that Flink table API is a
> >> superset
> >> >> of
> >> >> Flink SQL, such as:
> >> >> - AddColumns/DropColums/RenameColumns, the detail can be found in
> >> Google
> >> >> doc
> >> >> <
> >> >>
> >>
> https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit#heading=h.7rwcjbvr52dc
> >> >> >
> >> >> - Interactive Programming in Flink Table API, the detail can be found
> >> in
> >> >> FLIP-36
> >> >> <
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >> >> >
> >> >> I think In the future, more and more features that cannot be
> expressed
> >> in
> >> >> SQL will be added in Table API.
> >> >>
> >> >> Thomas Weise <[hidden email]> 于2019年4月5日周五 下午12:11写道:
> >> >>
> >> >> > Hi Jincheng,
> >> >> >
> >> >> > >
> >> >> > > Yes, we can add use case examples in both google doc and FLIP, I
> >> had
> >> >> > > already add the simple usage in the google doc, here I want to
> know
> >> >> which
> >> >> > > kind of examples you want? :)
> >> >> > >
> >> >> >
> >> >> > Do you have use cases where the Python table API can be applied
> >> without
> >> >> UDF
> >> >> > support?
> >> >> >
> >> >> > (And where the same could not be accomplished with just SQL.)
> >> >> >
> >> >> >
> >> >> > > The very short answer to UDF support is Yes. As you said, we need
> >> UDF
> >> >> > > support on the Python Table API, including (UDF, UDTF, UDAF).
> This
> >> >> needs
> >> >> > to
> >> >> > > be discussed after basic Python TableAPI supported. Because UDF
> >> >> involves
> >> >> > > the management of the python environment, Runtime level Java and
> >> >> Runtime
> >> >> > > communication, and UDAF in Flink also involves the application of
> >> >> State,
> >> >> > so
> >> >> > > this is a topic that is worth discussing in depth in a separate
> >> >> thread.
> >> >> > >
> >> >> >
> >> >> > The current proposal for job submission touches something that Beam
> >> >> > portability already had to solve.
> >> >> >
> >> >> > If we think that the Python table API will only be useful with UDF
> >> >> support
> >> >> > (question above), then it may be better to discuss the first step
> >> with
> >> >> the
> >> >> > final goal in mind. If we find that Beam can be used for the UDF
> part
> >> >> then
> >> >> > approach 1 vs. approach 2 in the doc (for the client side language
> >> >> > boundary) may look different.
> >> >> >
> >> >> >
> >> >> > >
> >> >> > > I think that no matter how the Flink and Beam work together on
> the
> >> UDF
> >> >> > > level, it will not affect the current Python API (interface), we
> >> can
> >> >> > first
> >> >> > > support the Python API in Flink. Then start the UDX
> (UDF/UDTF/UDAF)
> >> >> > > support.
> >> >> > >
> >> >> > >
> >> >> > I agree that the client side API should not be affected.
> >> >> >
> >> >> >
> >> >> > > And great thanks for your valuable comments in Google doc! I will
> >> >> > feedback
> >> >> > > you in the google doc. :)
> >> >> > >
> >> >> > >
> >> >> > > Regards,
> >> >> > > Jincheng
> >> >> > >
> >> >> > > Thomas Weise <[hidden email]> 于2019年4月4日周四 上午8:03写道:
> >> >> > >
> >> >> > > > Thanks for putting this proposal together.
> >> >> > > >
> >> >> > > > It would be nice, if you could share a few use case examples
> >> (maybe
> >> >> add
> >> >> > > > them as section to the FLIP?).
> >> >> > > >
> >> >> > > > The reason I ask: The table API is immensely useful, but it
> isn't
> >> >> clear
> >> >> > > to
> >> >> > > > me what value other language bindings provide without UDF
> >> support.
> >> >> With
> >> >> > > > FLIP-38 it will be possible to write a program in Python, but
> not
> >> >> > execute
> >> >> > > > Python functions. Without UDF support, isn't it possible to
> >> achieve
> >> >> > > roughly
> >> >> > > > the same with plain SQL? In which situation would I use the
> >> Python
> >> >> API?
> >> >> > > >
> >> >> > > > There was related discussion regarding UDF support in [1]. If
> the
> >> >> > > > assumption is that such support will be added later, then I
> would
> >> >> like
> >> >> > to
> >> >> > > > circle back to the question why this cannot be built on top of
> >> >> Beam? It
> >> >> > > > would be nice to clarify the bigger goal before embarking for
> the
> >> >> first
> >> >> > > > milestone.
> >> >> > > >
> >> >> > > > I'm going to comment on other things in the doc.
> >> >> > > >
> >> >> > > > [1]
> >> >> > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E
> >> >> > > >
> >> >> > > > Thomas
> >> >> > > >
> >> >> > > >
> >> >> > > > On Wed, Apr 3, 2019 at 12:35 PM Shuyi Chen <[hidden email]
> >
> >> >> wrote:
> >> >> > > >
> >> >> > > > > Thanks a lot for driving the FLIP, jincheng. The approach
> looks
> >> >> > > > > good. Adding multi-lang support sounds a promising direction
> to
> >> >> > expand
> >> >> > > > the
> >> >> > > > > footprint of Flink. Do we have plan for adding Golang
> support?
> >> As
> >> >> > many
> >> >> > > > > backend engineers nowadays are familiar with Go, but probably
> >> not
> >> >> > Java
> >> >> > > as
> >> >> > > > > much, adding Golang support would significantly reduce their
> >> >> friction
> >> >> > > to
> >> >> > > > > use Flink. Also, do we have a design for multi-lang UDF
> >> support,
> >> >> and
> >> >> > > > what's
> >> >> > > > > timeline for adding DataStream API support? We would like to
> >> help
> >> >> and
> >> >> > > > > contribute as well as we do have similar need internally at
> our
> >> >> > > company.
> >> >> > > > > Thanks a lot.
> >> >> > > > >
> >> >> > > > > Shuyi
> >> >> > > > >
> >> >> > > > > On Tue, Apr 2, 2019 at 1:03 AM jincheng sun <
> >> >> > [hidden email]>
> >> >> > > > > wrote:
> >> >> > > > >
> >> >> > > > > > Hi All,
> >> >> > > > > > As Xianda brought up in the previous email, There are a
> large
> >> >> > number
> >> >> > > of
> >> >> > > > > > data analysis users who want flink to support Python. At
> the
> >> >> Flink
> >> >> > > API
> >> >> > > > > > level, we have DataStreamAPI/DataSetAPI/TableAPI&SQL, the
> >> Table
> >> >> API
> >> >> > > > will
> >> >> > > > > > become the first-class citizen. Table API is declarative
> and
> >> >> can be
> >> >> > > > > > automatically optimized, which is mentioned in the Flink
> >> >> mid-term
> >> >> > > > roadmap
> >> >> > > > > > by Stephan. So we first considering supporting Python at
> the
> >> >> Table
> >> >> > > > level
> >> >> > > > > to
> >> >> > > > > > cater to the current large number of analytics users. For
> >> >> further
> >> >> > > > promote
> >> >> > > > > > Python support in flink table level. Dian, Wei and I
> >> discussed
> >> >> > > offline
> >> >> > > > a
> >> >> > > > > > bit and came up with an initial features outline as
> follows:
> >> >> > > > > >
> >> >> > > > > > - Python TableAPI Interface
> >> >> > > > > >   Introduce a set of Python Table API interfaces, including
> >> >> > interface
> >> >> > > > > > definitions such as Table, TableEnvironment, TableConfig,
> >> etc.
> >> >> > > > > >
> >> >> > > > > > - Implementation Architecture
> >> >> > > > > >   We will offer two alternative architecture options, one
> for
> >> >> pure
> >> >> > > > Python
> >> >> > > > > > language support and one for extended multi-language
> design.
> >> >> > > > > >
> >> >> > > > > > - Job Submission
> >> >> > > > > >   Provide a way that can submit(local/remote) Python Table
> >> API
> >> >> > jobs.
> >> >> > > > > >
> >> >> > > > > > - Python Shell
> >> >> > > > > >   Python Shell is to provide an interactive way for users
> to
> >> >> write
> >> >> > > and
> >> >> > > > > > execute flink Python Table API jobs.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > > The design document for FLIP-38 can be found here:
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
> >> >> > > > > >
> >> >> > > > > > I am looking forward to your comments and feedback.
> >> >> > > > > >
> >> >> > > > > > Best,
> >> >> > > > > > Jincheng
> >> >> > > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

jincheng sun
Hi Stephan,

Thanks for your summary, from the points of my view, we are on the same
page about the conclusion of the discussion!

I completely agree that we can divide the support of the Python Table API
into short-term and long-term goals, and the design of short-term goals
should be smoothly upgraded to long-term goals.
And we will also continue to communicate with the Beam community to achieve
the long-term goals.

We hope is that Flink 1.9 can support the Python Table API, so I am
preparing to create FLIP-38 in Flink Confluence and preparing to open the
first PR of Python Table API. Of course, we can continue the discussion in
the google doc and mail thread for the design that does not reach
consensus. Is that makes sense to you?

Regards,
Jincheng

Stephan Ewen <[hidden email]> 于2019年4月24日周三 上午3:24写道:

> Hi all!
>
> Below are my notes on the discussion last week on how to collaborate
> between Beam and Flink.
> The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> Jincheng, and me.
>
> This represents my understanding of the discussion, please augment this
> where I missed something or where your conclusion was different.
>
> Best,
> Stephan
>
> =======================================================
>
> *Beams Python and Portability Framework*
>
>   - Portability core to Beam
>   - Language independent dataflow DAG that is defined via ProtoBuf
>   - DAG can be generated from various languages (Java, Python, Go)
>   - The DAG describes the pipelines and contains additional parameters to
> describe each operator, and contains artifacts that need to be deployed /
> executed as part of an operator execution.
>   - Operators execute in language-specific containers, data is exchanged
> between the language-specific container and the runner container (JVM) via
> gRPC.
>
> *Flink's desiderata for Python API*
>
>   - Python API should mirror Java / Scala Table API
>   - All relational expressions that correspond to built-in functions
> should be translated to corresponding expressions in the Table API. That
> way the planner generated Java code for the data types and built-in
> expressions, meaning no Python code is necessary during execution
>   - UDFs should be supported and run similarly as in Beam's approach
>   - Python programs should be similarly created and submitted/deployed as
> Java / Scala programs (CLI, web, containerized, etc.)
>
> *Consensus to share inter-process communication code*
>
>   - Crucial code for robust setup and high performance data exchange
> across processes
>   - The code for the SDK harness, the artifact boostrapping, and the data
> exchange make sense to share.
>   - Ongoing discussion whether this can be a dedicated module with slim
> dependencies in Beam
>
> *Potential Long Term Perspective: Share language-independent DAG
> representation*
>
>   - Beam's language independent DAG could become a standard representation
> used in both projects
>   - Flink would need an way to receive that DAG, map it to the Table API,
> execute it from there
>   - The DAG would need to have a standardized representation of functions
> and expressions that then get mapped to Table API expressions to let the
> planner optimize those and generate Java code for those
>   - Similar as UDFs are supported in the Table API, there would be
> additional "external UDFs" that would go through the above mentioned
> inter-process communication layer
>
>   - *Advantages:*
>     => Flink and Beam could share more language bindings
>     => Flink would execute Beam portability programs fast, without
> intermediate abstraction and directly in the JVM for many operators.
>          Abstraction is necessary around UDFs and to bridge between
> serializers / coders, etc.
>
>   - *Open issues:*
>     => Biggest question is whether the language-independent DAG is
> expressive enough to capture all the expressions that we want to map
> directly to Table API expressions. Currently much is hidden in opaque UDFs.
> Kenn mentioned the structure should be flexible enough to capture more
> expressions transparently.
>
>     => If the DAG is generic enough to capture the additional information,
> we probably still need some standardization, so that all the different
> language APIs represent their expressions the same way
>     => Similarly, it makes sense to standardize the type system (and type
> inference) as far as built-in expressions and their interaction with UDFs
> are concerned. The Flink Table API and Blink teams found this to be
> essential for a consistent API behavior. This would not prevent all-UDF
> programs from still using purely binary/opaque types.
>
>  =>  We need to create a Python API that follows the same structure as
> Flink's Table API that produces the language-independent DAG
>
> *Short-term approach in Flink*
>
>   - Goal is to not block Flink's Python effort on the long term approach
> and the necessary design and evolution of the language-independent DAG.
>   - Depending on what the outcome of above investigation is, Flink may
> initially go with a simple approach to map the Python Table API to the the
> Java Table API via Py4J, as outlined in FLIP-38:
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
>
>
>
> On Tue, Apr 23, 2019 at 4:14 AM jincheng sun <[hidden email]>
> wrote:
>
>> Hi everyone,
>>
>> Thank you for all of your feedback and comments in google doc!
>>
>> I have updated the google doc and add the UDFs part. For a short summary:
>>
>>   - Python TableAPI - Flink introduces a set of Python Table API
>> Interfaces
>> which align with Flink Java Table API. It uses Py4j framework to
>> communicate between Python VM  and Java VM.
>>   - Python User-defined functions - IMO. Flink supports the communication
>> framework of UDFs, we will try to reuse the existing achievements of Beam
>> as much as possible, and do our best for this. The first step is
>>       to solve the above interface definition problem, which turns `
>> WindowedValue<T>` into `T` in the FnDataService and BeamFnDataClient
>> interface definition, has been discussed in the Beam community.
>>
>> The detail can be fonded here:
>>
>> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
>>
>> So we can start the development of Table API without UDFs in Flink, and
>> work with the Beam community to promote the abstraction of Beam.
>>
>> What do you think?
>>
>> Regards,
>> Jincheng
>>
>> jincheng sun <[hidden email]> 于2019年4月17日周三 下午4:01写道:
>>
>> > Hi Stephan,
>> >
>> > Thanks for your suggestion and summarize. :)
>> >
>> >      ==> The FLIP should probably reflect the full goal rather than the
>> >> first implementation step only, this would make sure everyone
>> understands
>> >> what the final goal of the effort is.
>> >
>> >
>> > I totally agree that we can implement the function in stages, but FLIP
>> > needs to reflect the full final goal. I agree with Thomas and you,  I
>> will
>> > add the design of the UDF part later.
>> >
>> > Yes, you are right, currently, we only consider the `flink run` and
>> > `python-shell` as the job entry point. and we should add REST API for
>> > another entry point.
>> >
>> > It would be super cool if the Python API would work seamlessly with all
>> >> modes of starting Flink jobs.
>> >
>> >
>> > If my understand you correctly, support Python TableAPI in Kubernetes,
>> we
>> > only need to increase (or improve the existing) REST API corresponding
>> to
>> > the Python Table API, of course, it also may need to release Docker
>> Image
>> > that supports Python, it will easily deploy Python TableAPI into
>> > Kubernetes.
>> >
>> > So, Finally, we support the following ways to submit Python TableAPI:
>> > - Python Shell - interactive development.
>> > - CLI - submit the job by `flink run`. e.g: deploy job into the yarn
>> > cluster.
>> > - REST - submit the job by REST API. e.g: deploy job into the kubernetes
>> > cluster.
>> >
>> > Please correct me if there are any incorrect understanding.
>> >
>> > Thanks,
>> > Jincheng
>> >
>> >
>> > Stephan Ewen <[hidden email]> 于2019年4月12日周五 上午12:22写道:
>> >
>> >> One more thought:
>> >>
>> >> The FLIP is very much centered on the CLI and it looks like it has
>> mainly
>> >> batch jobs and session clusters in mind.
>> >>
>> >> In very many cases, especially in streaming cases, the CLI (or shell)
>> is
>> >> not the entry point for a program.
>> >> See for example the use of Flink jobs on Kubernetes (Container Mode /
>> >> Entrypoint).
>> >>
>> >> It would be super cool if the Python API would work seamlessly with all
>> >> modes of starting Flink jobs.
>> >> That would make i available to all users.
>> >>
>> >> On Thu, Apr 11, 2019 at 5:34 PM Stephan Ewen <[hidden email]> wrote:
>> >>
>> >> > Hi all!
>> >> >
>> >> > I think that all the opinions and ideas are not actually in
>> conflict, so
>> >> > let me summarize what I understand is the proposal:
>> >> >
>> >> > *(1) Long-term goal: Full Python Table API with UDFs*
>> >> >
>> >> >      To break the implementation effort up into stages, the first
>> step
>> >> > would be the API without UDFs.
>> >> >       Because of all the built-in functions in the Table API, this
>> can
>> >> > already exist by itself, with some value, but ultimately is quite
>> >> limited
>> >> > without UDF support.
>> >> >
>> >> >      ==> The FLIP should probably reflect the full goal rather than
>> the
>> >> > first implementation step only, this would make sure everyone
>> >> understands
>> >> > what the final goal of the effort is.
>> >> >
>> >> >
>> >> > *(2) Relationship to Beam Language Portability*
>> >> >
>> >> > Flink's own Python Table API and Beam-Python on Flink add different
>> >> value
>> >> > and are both attractive for different scenarios.
>> >> >
>> >> >   - Beam's Python API supports complex pipelines in a similar style
>> as
>> >> the
>> >> > DataStream API. There is also the ecosystem of libraries built on top
>> >> that
>> >> > DSL, for example for machine learning.
>> >> >
>> >> >   - Flink's Python Table API builds mostly relational expressions,
>> plus
>> >> > some UDFs. Most of the Python code never executes in Python, though.
>> It
>> >> is
>> >> > geared at use cases similar to Flink's Table API.
>> >> >
>> >> > Both approaches mainly differ in how the streaming DAG is built from
>> >> > Python code and received by the JVM.
>> >> >
>> >> > In previous discussions, we concluded that for inter process data
>> >> exchange
>> >> > (JVM <> Python), we want to share code with Beam.
>> >> > That part is possibly the most crucial piece to getting performance
>> out
>> >> of
>> >> > the Python DSL, so will benefit from sharing development,
>> optimizations,
>> >> > etc.
>> >> >
>> >> > Best,
>> >> > Stephan
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Fri, Apr 5, 2019 at 5:25 PM jincheng sun <
>> [hidden email]>
>> >> > wrote:
>> >> >
>> >> >> One more thing It's better to mention that Flink table API is a
>> >> superset
>> >> >> of
>> >> >> Flink SQL, such as:
>> >> >> - AddColumns/DropColums/RenameColumns, the detail can be found in
>> >> Google
>> >> >> doc
>> >> >> <
>> >> >>
>> >>
>> https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit#heading=h.7rwcjbvr52dc
>> >> >> >
>> >> >> - Interactive Programming in Flink Table API, the detail can be
>> found
>> >> in
>> >> >> FLIP-36
>> >> >> <
>> >> >>
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>> >> >> >
>> >> >> I think In the future, more and more features that cannot be
>> expressed
>> >> in
>> >> >> SQL will be added in Table API.
>> >> >>
>> >> >> Thomas Weise <[hidden email]> 于2019年4月5日周五 下午12:11写道:
>> >> >>
>> >> >> > Hi Jincheng,
>> >> >> >
>> >> >> > >
>> >> >> > > Yes, we can add use case examples in both google doc and FLIP, I
>> >> had
>> >> >> > > already add the simple usage in the google doc, here I want to
>> know
>> >> >> which
>> >> >> > > kind of examples you want? :)
>> >> >> > >
>> >> >> >
>> >> >> > Do you have use cases where the Python table API can be applied
>> >> without
>> >> >> UDF
>> >> >> > support?
>> >> >> >
>> >> >> > (And where the same could not be accomplished with just SQL.)
>> >> >> >
>> >> >> >
>> >> >> > > The very short answer to UDF support is Yes. As you said, we
>> need
>> >> UDF
>> >> >> > > support on the Python Table API, including (UDF, UDTF, UDAF).
>> This
>> >> >> needs
>> >> >> > to
>> >> >> > > be discussed after basic Python TableAPI supported. Because UDF
>> >> >> involves
>> >> >> > > the management of the python environment, Runtime level Java and
>> >> >> Runtime
>> >> >> > > communication, and UDAF in Flink also involves the application
>> of
>> >> >> State,
>> >> >> > so
>> >> >> > > this is a topic that is worth discussing in depth in a separate
>> >> >> thread.
>> >> >> > >
>> >> >> >
>> >> >> > The current proposal for job submission touches something that
>> Beam
>> >> >> > portability already had to solve.
>> >> >> >
>> >> >> > If we think that the Python table API will only be useful with UDF
>> >> >> support
>> >> >> > (question above), then it may be better to discuss the first step
>> >> with
>> >> >> the
>> >> >> > final goal in mind. If we find that Beam can be used for the UDF
>> part
>> >> >> then
>> >> >> > approach 1 vs. approach 2 in the doc (for the client side language
>> >> >> > boundary) may look different.
>> >> >> >
>> >> >> >
>> >> >> > >
>> >> >> > > I think that no matter how the Flink and Beam work together on
>> the
>> >> UDF
>> >> >> > > level, it will not affect the current Python API (interface), we
>> >> can
>> >> >> > first
>> >> >> > > support the Python API in Flink. Then start the UDX
>> (UDF/UDTF/UDAF)
>> >> >> > > support.
>> >> >> > >
>> >> >> > >
>> >> >> > I agree that the client side API should not be affected.
>> >> >> >
>> >> >> >
>> >> >> > > And great thanks for your valuable comments in Google doc! I
>> will
>> >> >> > feedback
>> >> >> > > you in the google doc. :)
>> >> >> > >
>> >> >> > >
>> >> >> > > Regards,
>> >> >> > > Jincheng
>> >> >> > >
>> >> >> > > Thomas Weise <[hidden email]> 于2019年4月4日周四 上午8:03写道:
>> >> >> > >
>> >> >> > > > Thanks for putting this proposal together.
>> >> >> > > >
>> >> >> > > > It would be nice, if you could share a few use case examples
>> >> (maybe
>> >> >> add
>> >> >> > > > them as section to the FLIP?).
>> >> >> > > >
>> >> >> > > > The reason I ask: The table API is immensely useful, but it
>> isn't
>> >> >> clear
>> >> >> > > to
>> >> >> > > > me what value other language bindings provide without UDF
>> >> support.
>> >> >> With
>> >> >> > > > FLIP-38 it will be possible to write a program in Python, but
>> not
>> >> >> > execute
>> >> >> > > > Python functions. Without UDF support, isn't it possible to
>> >> achieve
>> >> >> > > roughly
>> >> >> > > > the same with plain SQL? In which situation would I use the
>> >> Python
>> >> >> API?
>> >> >> > > >
>> >> >> > > > There was related discussion regarding UDF support in [1]. If
>> the
>> >> >> > > > assumption is that such support will be added later, then I
>> would
>> >> >> like
>> >> >> > to
>> >> >> > > > circle back to the question why this cannot be built on top of
>> >> >> Beam? It
>> >> >> > > > would be nice to clarify the bigger goal before embarking for
>> the
>> >> >> first
>> >> >> > > > milestone.
>> >> >> > > >
>> >> >> > > > I'm going to comment on other things in the doc.
>> >> >> > > >
>> >> >> > > > [1]
>> >> >> > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E
>> >> >> > > >
>> >> >> > > > Thomas
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > On Wed, Apr 3, 2019 at 12:35 PM Shuyi Chen <
>> [hidden email]>
>> >> >> wrote:
>> >> >> > > >
>> >> >> > > > > Thanks a lot for driving the FLIP, jincheng. The approach
>> looks
>> >> >> > > > > good. Adding multi-lang support sounds a promising
>> direction to
>> >> >> > expand
>> >> >> > > > the
>> >> >> > > > > footprint of Flink. Do we have plan for adding Golang
>> support?
>> >> As
>> >> >> > many
>> >> >> > > > > backend engineers nowadays are familiar with Go, but
>> probably
>> >> not
>> >> >> > Java
>> >> >> > > as
>> >> >> > > > > much, adding Golang support would significantly reduce their
>> >> >> friction
>> >> >> > > to
>> >> >> > > > > use Flink. Also, do we have a design for multi-lang UDF
>> >> support,
>> >> >> and
>> >> >> > > > what's
>> >> >> > > > > timeline for adding DataStream API support? We would like to
>> >> help
>> >> >> and
>> >> >> > > > > contribute as well as we do have similar need internally at
>> our
>> >> >> > > company.
>> >> >> > > > > Thanks a lot.
>> >> >> > > > >
>> >> >> > > > > Shuyi
>> >> >> > > > >
>> >> >> > > > > On Tue, Apr 2, 2019 at 1:03 AM jincheng sun <
>> >> >> > [hidden email]>
>> >> >> > > > > wrote:
>> >> >> > > > >
>> >> >> > > > > > Hi All,
>> >> >> > > > > > As Xianda brought up in the previous email, There are a
>> large
>> >> >> > number
>> >> >> > > of
>> >> >> > > > > > data analysis users who want flink to support Python. At
>> the
>> >> >> Flink
>> >> >> > > API
>> >> >> > > > > > level, we have DataStreamAPI/DataSetAPI/TableAPI&SQL, the
>> >> Table
>> >> >> API
>> >> >> > > > will
>> >> >> > > > > > become the first-class citizen. Table API is declarative
>> and
>> >> >> can be
>> >> >> > > > > > automatically optimized, which is mentioned in the Flink
>> >> >> mid-term
>> >> >> > > > roadmap
>> >> >> > > > > > by Stephan. So we first considering supporting Python at
>> the
>> >> >> Table
>> >> >> > > > level
>> >> >> > > > > to
>> >> >> > > > > > cater to the current large number of analytics users. For
>> >> >> further
>> >> >> > > > promote
>> >> >> > > > > > Python support in flink table level. Dian, Wei and I
>> >> discussed
>> >> >> > > offline
>> >> >> > > > a
>> >> >> > > > > > bit and came up with an initial features outline as
>> follows:
>> >> >> > > > > >
>> >> >> > > > > > - Python TableAPI Interface
>> >> >> > > > > >   Introduce a set of Python Table API interfaces,
>> including
>> >> >> > interface
>> >> >> > > > > > definitions such as Table, TableEnvironment, TableConfig,
>> >> etc.
>> >> >> > > > > >
>> >> >> > > > > > - Implementation Architecture
>> >> >> > > > > >   We will offer two alternative architecture options, one
>> for
>> >> >> pure
>> >> >> > > > Python
>> >> >> > > > > > language support and one for extended multi-language
>> design.
>> >> >> > > > > >
>> >> >> > > > > > - Job Submission
>> >> >> > > > > >   Provide a way that can submit(local/remote) Python Table
>> >> API
>> >> >> > jobs.
>> >> >> > > > > >
>> >> >> > > > > > - Python Shell
>> >> >> > > > > >   Python Shell is to provide an interactive way for users
>> to
>> >> >> write
>> >> >> > > and
>> >> >> > > > > > execute flink Python Table API jobs.
>> >> >> > > > > >
>> >> >> > > > > >
>> >> >> > > > > > The design document for FLIP-38 can be found here:
>> >> >> > > > > >
>> >> >> > > > > >
>> >> >> > > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
>> >> >> > > > > >
>> >> >> > > > > > I am looking forward to your comments and feedback.
>> >> >> > > > > >
>> >> >> > > > > > Best,
>> >> >> > > > > > Jincheng
>> >> >> > > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >
>> >>
>> >
>>
>
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

mxm
In reply to this post by Stephan Ewen
Hi Stephan,

This is excited! Thanks for sharing. The inter-process communication
code looks like the most natural choice as a common ground. To go
further, there are indeed some challenges to solve.

> => Biggest question is whether the language-independent DAG is expressive enough to capture all the expressions that we want to map directly to Table API expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure should be flexible enough to capture more expressions transparently.

Just to add some context how this could be done, there is the concept of
a FunctionSpec which is part of a transform in the DAG. FunctionSpec
contains a URN and with a payload. FunctionSpec can be either (1)
translated by the Runner directly, e.g. map to table API concepts or (2)
run a user-defined function with an Environment. It could be feasible
for Flink to choose the direct path, whereas Beam Runners would leverage
the more generic approach using UDFs. Granted, compatibility across
Flink and Beam would only work if both of the translation paths yielded
the same semantics.

>  If the DAG is generic enough to capture the additional information, we probably still need some standardization, so that all the different language APIs represent their expressions the same way

I wonder whether that's necessary as a first step. I think it would be
fine for Flink to have its own way to represent API concepts in the Beam
DAG which Beam Runners may not be able to understand. We could then
successively add the capability for these transforms to run with Beam.

>  Similarly, it makes sense to standardize the type system (and type inference) as far as built-in expressions and their interaction with UDFs are concerned. The Flink Table API and Blink teams found this to be essential for a consistent API behavior. This would not prevent all-UDF programs from still using purely binary/opaque types.

Beam has a set of standard coders which can be used across languages. We
will have to expand those to play well with Flink's:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types

I think we will need to exchange more ideas to work out a model that
will work for both Flink and Beam. A regular meeting could be helpful.

Thanks,
Max

On 23.04.19 21:23, Stephan Ewen wrote:

> Hi all!
>
> Below are my notes on the discussion last week on how to collaborate
> between Beam and Flink.
> The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> Jincheng, and me.
>
> This represents my understanding of the discussion, please augment this
> where I missed something or where your conclusion was different.
>
> Best,
> Stephan
>
> =======================================================
>
> *Beams Python and Portability Framework*
>
>    - Portability core to Beam
>    - Language independent dataflow DAG that is defined via ProtoBuf
>    - DAG can be generated from various languages (Java, Python, Go)
>    - The DAG describes the pipelines and contains additional parameters
> to describe each operator, and contains artifacts that need to be
> deployed / executed as part of an operator execution.
>    - Operators execute in language-specific containers, data is
> exchanged between the language-specific container and the runner
> container (JVM) via gRPC.
>
> *Flink's desiderata for Python API*
>
>    - Python API should mirror Java / Scala Table API
>    - All relational expressions that correspond to built-in functions
> should be translated to corresponding expressions in the Table API. That
> way the planner generated Java code for the data types and built-in
> expressions, meaning no Python code is necessary during execution
>    - UDFs should be supported and run similarly as in Beam's approach
>    - Python programs should be similarly created and submitted/deployed
> as Java / Scala programs (CLI, web, containerized, etc.)
>
> *Consensus to share inter-process communication code*
>
>    - Crucial code for robust setup and high performance data exchange
> across processes
>    - The code for the SDK harness, the artifact boostrapping, and the
> data exchange make sense to share.
>    - Ongoing discussion whether this can be a dedicated module with slim
> dependencies in Beam
>
> *Potential Long Term Perspective: Share language-independent DAG
> representation*
>
>    - Beam's language independent DAG could become a standard
> representation used in both projects
>    - Flink would need an way to receive that DAG, map it to the Table
> API, execute it from there
>    - The DAG would need to have a standardized representation of
> functions and expressions that then get mapped to Table API expressions
> to let the planner optimize those and generate Java code for those
>    - Similar as UDFs are supported in the Table API, there would be
> additional "external UDFs" that would go through the above mentioned
> inter-process communication layer
>
>    - _Advantages:_
>      => Flink and Beam could share more language bindings
>      => Flink would execute Beam portability programs fast, without
> intermediate abstraction and directly in the JVM for many operators.
>           Abstraction is necessary around UDFs and to bridge between
> serializers / coders, etc.
>
>    - _Open issues:_
>      => Biggest question is whether the language-independent DAG is
> expressive enough to capture all the expressions that we want to map
> directly to Table API expressions. Currently much is hidden in opaque
> UDFs. Kenn mentioned the structure should be flexible enough to capture
> more expressions transparently.
>
>      => If the DAG is generic enough to capture the additional
> information, we probably still need some standardization, so that all
> the different language APIs represent their expressions the same way
>      => Similarly, it makes sense to standardize the type system (and
> type inference) as far as built-in expressions and their interaction
> with UDFs are concerned. The Flink Table API and Blink teams found this
> to be essential for a consistent API behavior. This would not prevent
> all-UDF programs from still using purely binary/opaque types.
>
>   =>  We need to create a Python API that follows the same structure as
> Flink's Table API that produces the language-independent DAG
>
> *Short-term approach in Flink*
>
>    - Goal is to not block Flink's Python effort on the long term
> approach and the necessary design and evolution of the
> language-independent DAG.
>    - Depending on what the outcome of above investigation is, Flink may
> initially go with a simple approach to map the Python Table API to the
> the Java Table API via Py4J, as outlined in FLIP-38:
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
>
>
>
> On Tue, Apr 23, 2019 at 4:14 AM jincheng sun <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     Thank you for all of your feedback and comments in google doc!
>
>     I have updated the google doc and add the UDFs part. For a short
>     summary:
>
>        - Python TableAPI - Flink introduces a set of Python Table API
>     Interfaces
>     which align with Flink Java Table API. It uses Py4j framework to
>     communicate between Python VM  and Java VM.
>        - Python User-defined functions - IMO. Flink supports the
>     communication
>     framework of UDFs, we will try to reuse the existing achievements of
>     Beam
>     as much as possible, and do our best for this. The first step is
>            to solve the above interface definition problem, which turns `
>     WindowedValue<T>` into `T` in the FnDataService and BeamFnDataClient
>     interface definition, has been discussed in the Beam community.
>
>     The detail can be fonded here:
>     https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
>
>     So we can start the development of Table API without UDFs in Flink, and
>     work with the Beam community to promote the abstraction of Beam.
>
>     What do you think?
>
>     Regards,
>     Jincheng
>
>     jincheng sun <[hidden email]
>     <mailto:[hidden email]>> 于2019年4月17日周三 下午4:01写道:
>
>      > Hi Stephan,
>      >
>      > Thanks for your suggestion and summarize. :)
>      >
>      >      ==> The FLIP should probably reflect the full goal rather
>     than the
>      >> first implementation step only, this would make sure everyone
>     understands
>      >> what the final goal of the effort is.
>      >
>      >
>      > I totally agree that we can implement the function in stages, but
>     FLIP
>      > needs to reflect the full final goal. I agree with Thomas and
>     you,  I will
>      > add the design of the UDF part later.
>      >
>      > Yes, you are right, currently, we only consider the `flink run` and
>      > `python-shell` as the job entry point. and we should add REST API for
>      > another entry point.
>      >
>      > It would be super cool if the Python API would work seamlessly
>     with all
>      >> modes of starting Flink jobs.
>      >
>      >
>      > If my understand you correctly, support Python TableAPI in
>     Kubernetes, we
>      > only need to increase (or improve the existing) REST API
>     corresponding to
>      > the Python Table API, of course, it also may need to release
>     Docker Image
>      > that supports Python, it will easily deploy Python TableAPI into
>      > Kubernetes.
>      >
>      > So, Finally, we support the following ways to submit Python TableAPI:
>      > - Python Shell - interactive development.
>      > - CLI - submit the job by `flink run`. e.g: deploy job into the yarn
>      > cluster.
>      > - REST - submit the job by REST API. e.g: deploy job into the
>     kubernetes
>      > cluster.
>      >
>      > Please correct me if there are any incorrect understanding.
>      >
>      > Thanks,
>      > Jincheng
>      >
>      >
>      > Stephan Ewen <[hidden email] <mailto:[hidden email]>> 于2019
>     年4月12日周五 上午12:22写道:
>      >
>      >> One more thought:
>      >>
>      >> The FLIP is very much centered on the CLI and it looks like it
>     has mainly
>      >> batch jobs and session clusters in mind.
>      >>
>      >> In very many cases, especially in streaming cases, the CLI (or
>     shell) is
>      >> not the entry point for a program.
>      >> See for example the use of Flink jobs on Kubernetes (Container
>     Mode /
>      >> Entrypoint).
>      >>
>      >> It would be super cool if the Python API would work seamlessly
>     with all
>      >> modes of starting Flink jobs.
>      >> That would make i available to all users.
>      >>
>      >> On Thu, Apr 11, 2019 at 5:34 PM Stephan Ewen <[hidden email]
>     <mailto:[hidden email]>> wrote:
>      >>
>      >> > Hi all!
>      >> >
>      >> > I think that all the opinions and ideas are not actually in
>     conflict, so
>      >> > let me summarize what I understand is the proposal:
>      >> >
>      >> > *(1) Long-term goal: Full Python Table API with UDFs*
>      >> >
>      >> >      To break the implementation effort up into stages, the
>     first step
>      >> > would be the API without UDFs.
>      >> >       Because of all the built-in functions in the Table API,
>     this can
>      >> > already exist by itself, with some value, but ultimately is quite
>      >> limited
>      >> > without UDF support.
>      >> >
>      >> >      ==> The FLIP should probably reflect the full goal rather
>     than the
>      >> > first implementation step only, this would make sure everyone
>      >> understands
>      >> > what the final goal of the effort is.
>      >> >
>      >> >
>      >> > *(2) Relationship to Beam Language Portability*
>      >> >
>      >> > Flink's own Python Table API and Beam-Python on Flink add
>     different
>      >> value
>      >> > and are both attractive for different scenarios.
>      >> >
>      >> >   - Beam's Python API supports complex pipelines in a similar
>     style as
>      >> the
>      >> > DataStream API. There is also the ecosystem of libraries built
>     on top
>      >> that
>      >> > DSL, for example for machine learning.
>      >> >
>      >> >   - Flink's Python Table API builds mostly relational
>     expressions, plus
>      >> > some UDFs. Most of the Python code never executes in Python,
>     though. It
>      >> is
>      >> > geared at use cases similar to Flink's Table API.
>      >> >
>      >> > Both approaches mainly differ in how the streaming DAG is
>     built from
>      >> > Python code and received by the JVM.
>      >> >
>      >> > In previous discussions, we concluded that for inter process data
>      >> exchange
>      >> > (JVM <> Python), we want to share code with Beam.
>      >> > That part is possibly the most crucial piece to getting
>     performance out
>      >> of
>      >> > the Python DSL, so will benefit from sharing development,
>     optimizations,
>      >> > etc.
>      >> >
>      >> > Best,
>      >> > Stephan
>      >> >
>      >> >
>      >> >
>      >> >
>      >> > On Fri, Apr 5, 2019 at 5:25 PM jincheng sun
>     <[hidden email] <mailto:[hidden email]>>
>      >> > wrote:
>      >> >
>      >> >> One more thing It's better to mention that Flink table API is a
>      >> superset
>      >> >> of
>      >> >> Flink SQL, such as:
>      >> >> - AddColumns/DropColums/RenameColumns, the detail can be found in
>      >> Google
>      >> >> doc
>      >> >> <
>      >> >>
>      >>
>     https://docs.google.com/document/d/1tryl6swt1K1pw7yvv5pdvFXSxfrBZ3_OkOObymis2ck/edit#heading=h.7rwcjbvr52dc
>      >> >> >
>      >> >> - Interactive Programming in Flink Table API, the detail can
>     be found
>      >> in
>      >> >> FLIP-36
>      >> >> <
>      >> >>
>      >>
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>      >> >> >
>      >> >> I think In the future, more and more features that cannot be
>     expressed
>      >> in
>      >> >> SQL will be added in Table API.
>      >> >>
>      >> >> Thomas Weise <[hidden email]
>     <mailto:[hidden email]>> 于2019年4月5日周五 下午12:11写道:
>      >> >>
>      >> >> > Hi Jincheng,
>      >> >> >
>      >> >> > >
>      >> >> > > Yes, we can add use case examples in both google doc and
>     FLIP, I
>      >> had
>      >> >> > > already add the simple usage in the google doc, here I
>     want to know
>      >> >> which
>      >> >> > > kind of examples you want? :)
>      >> >> > >
>      >> >> >
>      >> >> > Do you have use cases where the Python table API can be applied
>      >> without
>      >> >> UDF
>      >> >> > support?
>      >> >> >
>      >> >> > (And where the same could not be accomplished with just SQL.)
>      >> >> >
>      >> >> >
>      >> >> > > The very short answer to UDF support is Yes. As you said,
>     we need
>      >> UDF
>      >> >> > > support on the Python Table API, including (UDF, UDTF,
>     UDAF). This
>      >> >> needs
>      >> >> > to
>      >> >> > > be discussed after basic Python TableAPI supported.
>     Because UDF
>      >> >> involves
>      >> >> > > the management of the python environment, Runtime level
>     Java and
>      >> >> Runtime
>      >> >> > > communication, and UDAF in Flink also involves the
>     application of
>      >> >> State,
>      >> >> > so
>      >> >> > > this is a topic that is worth discussing in depth in a
>     separate
>      >> >> thread.
>      >> >> > >
>      >> >> >
>      >> >> > The current proposal for job submission touches something
>     that Beam
>      >> >> > portability already had to solve.
>      >> >> >
>      >> >> > If we think that the Python table API will only be useful
>     with UDF
>      >> >> support
>      >> >> > (question above), then it may be better to discuss the
>     first step
>      >> with
>      >> >> the
>      >> >> > final goal in mind. If we find that Beam can be used for
>     the UDF part
>      >> >> then
>      >> >> > approach 1 vs. approach 2 in the doc (for the client side
>     language
>      >> >> > boundary) may look different.
>      >> >> >
>      >> >> >
>      >> >> > >
>      >> >> > > I think that no matter how the Flink and Beam work
>     together on the
>      >> UDF
>      >> >> > > level, it will not affect the current Python API
>     (interface), we
>      >> can
>      >> >> > first
>      >> >> > > support the Python API in Flink. Then start the UDX
>     (UDF/UDTF/UDAF)
>      >> >> > > support.
>      >> >> > >
>      >> >> > >
>      >> >> > I agree that the client side API should not be affected.
>      >> >> >
>      >> >> >
>      >> >> > > And great thanks for your valuable comments in Google
>     doc! I will
>      >> >> > feedback
>      >> >> > > you in the google doc. :)
>      >> >> > >
>      >> >> > >
>      >> >> > > Regards,
>      >> >> > > Jincheng
>      >> >> > >
>      >> >> > > Thomas Weise <[hidden email] <mailto:[hidden email]>> 于
>     2019年4月4日周四 上午8:03写道:
>      >> >> > >
>      >> >> > > > Thanks for putting this proposal together.
>      >> >> > > >
>      >> >> > > > It would be nice, if you could share a few use case
>     examples
>      >> (maybe
>      >> >> add
>      >> >> > > > them as section to the FLIP?).
>      >> >> > > >
>      >> >> > > > The reason I ask: The table API is immensely useful,
>     but it isn't
>      >> >> clear
>      >> >> > > to
>      >> >> > > > me what value other language bindings provide without UDF
>      >> support.
>      >> >> With
>      >> >> > > > FLIP-38 it will be possible to write a program in
>     Python, but not
>      >> >> > execute
>      >> >> > > > Python functions. Without UDF support, isn't it possible to
>      >> achieve
>      >> >> > > roughly
>      >> >> > > > the same with plain SQL? In which situation would I use the
>      >> Python
>      >> >> API?
>      >> >> > > >
>      >> >> > > > There was related discussion regarding UDF support in
>     [1]. If the
>      >> >> > > > assumption is that such support will be added later,
>     then I would
>      >> >> like
>      >> >> > to
>      >> >> > > > circle back to the question why this cannot be built on
>     top of
>      >> >> Beam? It
>      >> >> > > > would be nice to clarify the bigger goal before
>     embarking for the
>      >> >> first
>      >> >> > > > milestone.
>      >> >> > > >
>      >> >> > > > I'm going to comment on other things in the doc.
>      >> >> > > >
>      >> >> > > > [1]
>      >> >> > > >
>      >> >> > > >
>      >> >> > >
>      >> >> >
>      >> >>
>      >>
>     https://lists.apache.org/thread.html/f6f8116b4b38b0b2d70ed45b990d6bb1bcb33611fde6fdf32ec0e840@%3Cdev.flink.apache.org%3E
>      >> >> > > >
>      >> >> > > > Thomas
>      >> >> > > >
>      >> >> > > >
>      >> >> > > > On Wed, Apr 3, 2019 at 12:35 PM Shuyi Chen
>     <[hidden email] <mailto:[hidden email]>>
>      >> >> wrote:
>      >> >> > > >
>      >> >> > > > > Thanks a lot for driving the FLIP, jincheng. The
>     approach looks
>      >> >> > > > > good. Adding multi-lang support sounds a promising
>     direction to
>      >> >> > expand
>      >> >> > > > the
>      >> >> > > > > footprint of Flink. Do we have plan for adding Golang
>     support?
>      >> As
>      >> >> > many
>      >> >> > > > > backend engineers nowadays are familiar with Go, but
>     probably
>      >> not
>      >> >> > Java
>      >> >> > > as
>      >> >> > > > > much, adding Golang support would significantly
>     reduce their
>      >> >> friction
>      >> >> > > to
>      >> >> > > > > use Flink. Also, do we have a design for multi-lang UDF
>      >> support,
>      >> >> and
>      >> >> > > > what's
>      >> >> > > > > timeline for adding DataStream API support? We would
>     like to
>      >> help
>      >> >> and
>      >> >> > > > > contribute as well as we do have similar need
>     internally at our
>      >> >> > > company.
>      >> >> > > > > Thanks a lot.
>      >> >> > > > >
>      >> >> > > > > Shuyi
>      >> >> > > > >
>      >> >> > > > > On Tue, Apr 2, 2019 at 1:03 AM jincheng sun <
>      >> >> > [hidden email] <mailto:[hidden email]>>
>      >> >> > > > > wrote:
>      >> >> > > > >
>      >> >> > > > > > Hi All,
>      >> >> > > > > > As Xianda brought up in the previous email, There
>     are a large
>      >> >> > number
>      >> >> > > of
>      >> >> > > > > > data analysis users who want flink to support
>     Python. At the
>      >> >> Flink
>      >> >> > > API
>      >> >> > > > > > level, we have
>     DataStreamAPI/DataSetAPI/TableAPI&SQL, the
>      >> Table
>      >> >> API
>      >> >> > > > will
>      >> >> > > > > > become the first-class citizen. Table API is
>     declarative and
>      >> >> can be
>      >> >> > > > > > automatically optimized, which is mentioned in the
>     Flink
>      >> >> mid-term
>      >> >> > > > roadmap
>      >> >> > > > > > by Stephan. So we first considering supporting
>     Python at the
>      >> >> Table
>      >> >> > > > level
>      >> >> > > > > to
>      >> >> > > > > > cater to the current large number of analytics
>     users. For
>      >> >> further
>      >> >> > > > promote
>      >> >> > > > > > Python support in flink table level. Dian, Wei and I
>      >> discussed
>      >> >> > > offline
>      >> >> > > > a
>      >> >> > > > > > bit and came up with an initial features outline as
>     follows:
>      >> >> > > > > >
>      >> >> > > > > > - Python TableAPI Interface
>      >> >> > > > > >   Introduce a set of Python Table API interfaces,
>     including
>      >> >> > interface
>      >> >> > > > > > definitions such as Table, TableEnvironment,
>     TableConfig,
>      >> etc.
>      >> >> > > > > >
>      >> >> > > > > > - Implementation Architecture
>      >> >> > > > > >   We will offer two alternative architecture
>     options, one for
>      >> >> pure
>      >> >> > > > Python
>      >> >> > > > > > language support and one for extended
>     multi-language design.
>      >> >> > > > > >
>      >> >> > > > > > - Job Submission
>      >> >> > > > > >   Provide a way that can submit(local/remote)
>     Python Table
>      >> API
>      >> >> > jobs.
>      >> >> > > > > >
>      >> >> > > > > > - Python Shell
>      >> >> > > > > >   Python Shell is to provide an interactive way for
>     users to
>      >> >> write
>      >> >> > > and
>      >> >> > > > > > execute flink Python Table API jobs.
>      >> >> > > > > >
>      >> >> > > > > >
>      >> >> > > > > > The design document for FLIP-38 can be found here:
>      >> >> > > > > >
>      >> >> > > > > >
>      >> >> > > > > >
>      >> >> > > > >
>      >> >> > > >
>      >> >> > >
>      >> >> >
>      >> >>
>      >>
>     https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8/edit?usp=sharing
>      >> >> > > > > >
>      >> >> > > > > > I am looking forward to your comments and feedback.
>      >> >> > > > > >
>      >> >> > > > > > Best,
>      >> >> > > > > > Jincheng
>      >> >> > > > > >
>      >> >> > > > >
>      >> >> > > >
>      >> >> > >
>      >> >> >
>      >> >>
>      >> >
>      >>
>      >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

Robert Bradshaw
Thanks for the meeting summary, Stephan. Sound like you covered a lot of
ground. Some more comments below, adding onto what Max has said.

On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <[hidden email]> wrote:
>
> Hi Stephan,
>
> This is excited! Thanks for sharing. The inter-process communication
> code looks like the most natural choice as a common ground. To go
> further, there are indeed some challenges to solve.

It certainly does make sense to share this work, though it does to me seem
like a rather low level to integrate at.

> > => Biggest question is whether the language-independent DAG is
expressive enough to capture all the expressions that we want to map
directly to Table API expressions. Currently much is hidden in opaque UDFs.
Kenn mentioned the structure should be flexible enough to capture more
expressions transparently.

>
> Just to add some context how this could be done, there is the concept of
> a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> contains a URN and with a payload. FunctionSpec can be either (1)
> translated by the Runner directly, e.g. map to table API concepts or (2)
> run a user-defined function with an Environment. It could be feasible
> for Flink to choose the direct path, whereas Beam Runners would leverage
> the more generic approach using UDFs. Granted, compatibility across
> Flink and Beam would only work if both of the translation paths yielded
> the same semantics.

To elaborate a bit on this, Beam DAGs are built up by applying Transforms
(basically operations) to PColections (the equivalent of
dataset/datastream), but the key point here is that these transforms are
often composite operations that expand out into smaller subtransforms. This
expansion happens during pipeline construction, but with the recent work on
cross language pipelines can happen out of process. This is one point of
extendability. Secondly, and importantly, this composite structure is
preserved in the DAG, and so a runner is free to ignore the provided
expansion and supply its own (so long as semantically it produces exactly
the same output). These composite operations can be identified by arbitrary
URNs + payloads, and any runner that does not understand them simply uses
the pre-provided expansion.

The existing Flink runner operates on exactly this principle, translating
URNs for the leaf operations (Map, Flatten, ...) as well as some composites
it can do better (e.g. Reshard). It is intentionally easy to define and add
new ones. This actually seems the easier approach (to me at least, but
that's probably heavily influenced by what I'm familiar with vs. what I'm
not).

As for how well this maps onto the Flink Tables API, part of that depends
on how much of the API is the operations themselves, and how much is
concerning configuration/environment/etc. which is harder to talk about in
an agnostic way.

Using something like Py4j is an easy way to get up an running, especially
for a very faithful API, but the instant one wants to add UDFs one hits a
cliff of sorts (which is surmountable, but likely a lot harder than having
gone the above approach). In addition (and I'll admit this is rather
subjective) it seems to me one of the primary values of a table-like API in
a given language (vs. just using (say) plain old SQL itself via a console)
is the ability to embed it in a larger pipeline, or at least drop in
operations that are not (as) naturally expressed in the "table way,"
including existing libraries. In other words, a full SDK. The Py4j wrapping
doesn't extend itself to such integration nearly as easily.

But I really do understand the desire to not block immediate work (and
value) for a longer term solution.

> >  If the DAG is generic enough to capture the additional information, we
probably still need some standardization, so that all the different
language APIs represent their expressions the same way
>
> I wonder whether that's necessary as a first step. I think it would be
> fine for Flink to have its own way to represent API concepts in the Beam
> DAG which Beam Runners may not be able to understand. We could then
> successively add the capability for these transforms to run with Beam.
>
> >  Similarly, it makes sense to standardize the type system (and type
inference) as far as built-in expressions and their interaction with UDFs
are concerned. The Flink Table API and Blink teams found this to be
essential for a consistent API behavior. This would not prevent all-UDF
programs from still using purely binary/opaque types.
>
> Beam has a set of standard coders which can be used across languages. We
> will have to expand those to play well with Flink's:
>
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
>
> I think we will need to exchange more ideas to work out a model that
> will work for both Flink and Beam. A regular meeting could be helpful.

+1, I think this would be really good for both this effort and general
collaboration between the Beam and Flink communities.

> Thanks,
> Max
>
> On 23.04.19 21:23, Stephan Ewen wrote:
> > Hi all!
> >
> > Below are my notes on the discussion last week on how to collaborate
> > between Beam and Flink.
> > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> > Jincheng, and me.
> >
> > This represents my understanding of the discussion, please augment this
> > where I missed something or where your conclusion was different.
> >
> > Best,
> > Stephan
> >
> > =======================================================
> >
> > *Beams Python and Portability Framework*
> >
> >    - Portability core to Beam
> >    - Language independent dataflow DAG that is defined via ProtoBuf
> >    - DAG can be generated from various languages (Java, Python, Go)
> >    - The DAG describes the pipelines and contains additional parameters
> > to describe each operator, and contains artifacts that need to be
> > deployed / executed as part of an operator execution.
> >    - Operators execute in language-specific containers, data is
> > exchanged between the language-specific container and the runner
> > container (JVM) via gRPC.
> >
> > *Flink's desiderata for Python API*
> >
> >    - Python API should mirror Java / Scala Table API
> >    - All relational expressions that correspond to built-in functions
> > should be translated to corresponding expressions in the Table API. That
> > way the planner generated Java code for the data types and built-in
> > expressions, meaning no Python code is necessary during execution
> >    - UDFs should be supported and run similarly as in Beam's approach
> >    - Python programs should be similarly created and submitted/deployed
> > as Java / Scala programs (CLI, web, containerized, etc.)
> >
> > *Consensus to share inter-process communication code*
> >
> >    - Crucial code for robust setup and high performance data exchange
> > across processes
> >    - The code for the SDK harness, the artifact boostrapping, and the
> > data exchange make sense to share.
> >    - Ongoing discussion whether this can be a dedicated module with slim
> > dependencies in Beam
> >
> > *Potential Long Term Perspective: Share language-independent DAG
> > representation*
> >
> >    - Beam's language independent DAG could become a standard
> > representation used in both projects
> >    - Flink would need an way to receive that DAG, map it to the Table
> > API, execute it from there
> >    - The DAG would need to have a standardized representation of
> > functions and expressions that then get mapped to Table API expressions
> > to let the planner optimize those and generate Java code for those
> >    - Similar as UDFs are supported in the Table API, there would be
> > additional "external UDFs" that would go through the above mentioned
> > inter-process communication layer
> >
> >    - _Advantages:_
> >      => Flink and Beam could share more language bindings
> >      => Flink would execute Beam portability programs fast, without
> > intermediate abstraction and directly in the JVM for many operators.
> >           Abstraction is necessary around UDFs and to bridge between
> > serializers / coders, etc.
> >
> >    - _Open issues:_
> >      => Biggest question is whether the language-independent DAG is
> > expressive enough to capture all the expressions that we want to map
> > directly to Table API expressions. Currently much is hidden in opaque
> > UDFs. Kenn mentioned the structure should be flexible enough to capture
> > more expressions transparently.
> >
> >      => If the DAG is generic enough to capture the additional
> > information, we probably still need some standardization, so that all
> > the different language APIs represent their expressions the same way
> >      => Similarly, it makes sense to standardize the type system (and
> > type inference) as far as built-in expressions and their interaction
> > with UDFs are concerned. The Flink Table API and Blink teams found this
> > to be essential for a consistent API behavior. This would not prevent
> > all-UDF programs from still using purely binary/opaque types.
> >
> >   =>  We need to create a Python API that follows the same structure as
> > Flink's Table API that produces the language-independent DAG
> >
> > *Short-term approach in Flink*
> >
> >    - Goal is to not block Flink's Python effort on the long term
> > approach and the necessary design and evolution of the
> > language-independent DAG.
> >    - Depending on what the outcome of above investigation is, Flink may
> > initially go with a simple approach to map the Python Table API to the
> > the Java Table API via Py4J, as outlined in FLIP-38:
> >
https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

Dian Fu-2
Thanks everyone for the discussion here.

Regarding to the Java/Scala UDF and the built-in UDF to execute in the current Flink way (directly in JVM, not via RPC), I share the same thoughts with Max and Robert and I think it will not be a big problem. From the design doc, I guess the main reason to take the Py4J way instead of the DAG way at present is that DAG has some limitations in some scenarios such as interactive programing which may be a strong requirement for data scientist.

> In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.


Hi Robert, regarding to "a larger pipeline", do you mean translating a table-like API jobs from/to another kind of API job or embedding third-part libraries into a table-like API jobs via UDF? Could you kindly explain why this would be a problem for Py4J and will not be a problem if expressing the job with DAG?

Thanks,
Dian


> 在 2019年4月25日,上午12:16,Robert Bradshaw <[hidden email]> 写道:
>
> Thanks for the meeting summary, Stephan. Sound like you covered a lot of ground. Some more comments below, adding onto what Max has said.
>
> On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <[hidden email] <mailto:[hidden email]>> wrote:
> >
> > Hi Stephan,
> >
> > This is excited! Thanks for sharing. The inter-process communication
> > code looks like the most natural choice as a common ground. To go
> > further, there are indeed some challenges to solve.
>
> It certainly does make sense to share this work, though it does to me seem like a rather low level to integrate at.
>
> > > => Biggest question is whether the language-independent DAG is expressive enough to capture all the expressions that we want to map directly to Table API expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure should be flexible enough to capture more expressions transparently.
> >
> > Just to add some context how this could be done, there is the concept of
> > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> > contains a URN and with a payload. FunctionSpec can be either (1)
> > translated by the Runner directly, e.g. map to table API concepts or (2)
> > run a user-defined function with an Environment. It could be feasible
> > for Flink to choose the direct path, whereas Beam Runners would leverage
> > the more generic approach using UDFs. Granted, compatibility across
> > Flink and Beam would only work if both of the translation paths yielded
> > the same semantics.
>
> To elaborate a bit on this, Beam DAGs are built up by applying Transforms (basically operations) to PColections (the equivalent of dataset/datastream), but the key point here is that these transforms are often composite operations that expand out into smaller subtransforms. This expansion happens during pipeline construction, but with the recent work on cross language pipelines can happen out of process. This is one point of extendability. Secondly, and importantly, this composite structure is preserved in the DAG, and so a runner is free to ignore the provided expansion and supply its own (so long as semantically it produces exactly the same output). These composite operations can be identified by arbitrary URNs + payloads, and any runner that does not understand them simply uses the pre-provided expansion.
>
> The existing Flink runner operates on exactly this principle, translating URNs for the leaf operations (Map, Flatten, ...) as well as some composites it can do better (e.g. Reshard). It is intentionally easy to define and add new ones. This actually seems the easier approach (to me at least, but that's probably heavily influenced by what I'm familiar with vs. what I'm not).
>
> As for how well this maps onto the Flink Tables API, part of that depends on how much of the API is the operations themselves, and how much is concerning configuration/environment/etc. which is harder to talk about in an agnostic way.
>
> Using something like Py4j is an easy way to get up an running, especially for a very faithful API, but the instant one wants to add UDFs one hits a cliff of sorts (which is surmountable, but likely a lot harder than having gone the above approach). In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.
>
> But I really do understand the desire to not block immediate work (and value) for a longer term solution.
>
> > >  If the DAG is generic enough to capture the additional information, we probably still need some standardization, so that all the different language APIs represent their expressions the same way
> >
> > I wonder whether that's necessary as a first step. I think it would be
> > fine for Flink to have its own way to represent API concepts in the Beam
> > DAG which Beam Runners may not be able to understand. We could then
> > successively add the capability for these transforms to run with Beam.
> >
> > >  Similarly, it makes sense to standardize the type system (and type inference) as far as built-in expressions and their interaction with UDFs are concerned. The Flink Table API and Blink teams found this to be essential for a consistent API behavior. This would not prevent all-UDF programs from still using purely binary/opaque types.
> >
> > Beam has a set of standard coders which can be used across languages. We
> > will have to expand those to play well with Flink's:
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types>
> >
> > I think we will need to exchange more ideas to work out a model that
> > will work for both Flink and Beam. A regular meeting could be helpful.
>
> +1, I think this would be really good for both this effort and general collaboration between the Beam and Flink communities.
>
> > Thanks,
> > Max
> >
> > On 23.04.19 21:23, Stephan Ewen wrote:
> > > Hi all!
> > >
> > > Below are my notes on the discussion last week on how to collaborate
> > > between Beam and Flink.
> > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> > > Jincheng, and me.
> > >
> > > This represents my understanding of the discussion, please augment this
> > > where I missed something or where your conclusion was different.
> > >
> > > Best,
> > > Stephan
> > >
> > > =======================================================
> > >
> > > *Beams Python and Portability Framework*
> > >
> > >    - Portability core to Beam
> > >    - Language independent dataflow DAG that is defined via ProtoBuf
> > >    - DAG can be generated from various languages (Java, Python, Go)
> > >    - The DAG describes the pipelines and contains additional parameters
> > > to describe each operator, and contains artifacts that need to be
> > > deployed / executed as part of an operator execution.
> > >    - Operators execute in language-specific containers, data is
> > > exchanged between the language-specific container and the runner
> > > container (JVM) via gRPC.
> > >
> > > *Flink's desiderata for Python API*
> > >
> > >    - Python API should mirror Java / Scala Table API
> > >    - All relational expressions that correspond to built-in functions
> > > should be translated to corresponding expressions in the Table API. That
> > > way the planner generated Java code for the data types and built-in
> > > expressions, meaning no Python code is necessary during execution
> > >    - UDFs should be supported and run similarly as in Beam's approach
> > >    - Python programs should be similarly created and submitted/deployed
> > > as Java / Scala programs (CLI, web, containerized, etc.)
> > >
> > > *Consensus to share inter-process communication code*
> > >
> > >    - Crucial code for robust setup and high performance data exchange
> > > across processes
> > >    - The code for the SDK harness, the artifact boostrapping, and the
> > > data exchange make sense to share.
> > >    - Ongoing discussion whether this can be a dedicated module with slim
> > > dependencies in Beam
> > >
> > > *Potential Long Term Perspective: Share language-independent DAG
> > > representation*
> > >
> > >    - Beam's language independent DAG could become a standard
> > > representation used in both projects
> > >    - Flink would need an way to receive that DAG, map it to the Table
> > > API, execute it from there
> > >    - The DAG would need to have a standardized representation of
> > > functions and expressions that then get mapped to Table API expressions
> > > to let the planner optimize those and generate Java code for those
> > >    - Similar as UDFs are supported in the Table API, there would be
> > > additional "external UDFs" that would go through the above mentioned
> > > inter-process communication layer
> > >
> > >    - _Advantages:_
> > >      => Flink and Beam could share more language bindings
> > >      => Flink would execute Beam portability programs fast, without
> > > intermediate abstraction and directly in the JVM for many operators.
> > >           Abstraction is necessary around UDFs and to bridge between
> > > serializers / coders, etc.
> > >
> > >    - _Open issues:_
> > >      => Biggest question is whether the language-independent DAG is
> > > expressive enough to capture all the expressions that we want to map
> > > directly to Table API expressions. Currently much is hidden in opaque
> > > UDFs. Kenn mentioned the structure should be flexible enough to capture
> > > more expressions transparently.
> > >
> > >      => If the DAG is generic enough to capture the additional
> > > information, we probably still need some standardization, so that all
> > > the different language APIs represent their expressions the same way
> > >      => Similarly, it makes sense to standardize the type system (and
> > > type inference) as far as built-in expressions and their interaction
> > > with UDFs are concerned. The Flink Table API and Blink teams found this
> > > to be essential for a consistent API behavior. This would not prevent
> > > all-UDF programs from still using purely binary/opaque types.
> > >
> > >   =>  We need to create a Python API that follows the same structure as
> > > Flink's Table API that produces the language-independent DAG
> > >
> > > *Short-term approach in Flink*
> > >
> > >    - Goal is to not block Flink's Python effort on the long term
> > > approach and the necessary design and evolution of the
> > > language-independent DAG.
> > >    - Depending on what the outcome of above investigation is, Flink may
> > > initially go with a simple approach to map the Python Table API to the
> > > the Java Table API via Py4J, as outlined in FLIP-38:
> > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 <https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

jincheng sun
Hi Robert,

In addition to the questions described by Dian, I also want to know what
difficult problems Py4j's solution will encounter in add UDF support, which
you mentioned as follows:

Using something like Py4j is an easy way to get up an running, especially
> for a very faithful API, but the instant one wants to add UDFs one hits a
> cliff of sorts (which is surmountable, but likely a lot harder than having
> gone the above approach).


I appreciate if you can share more specific cases?

Thanks,
Jincheng

Dian Fu <[hidden email]> 于2019年4月25日周四 上午11:53写道:

> Thanks everyone for the discussion here.
>
> Regarding to the Java/Scala UDF and the built-in UDF to execute in the
> current Flink way (directly in JVM, not via RPC), I share the same thoughts
> with Max and Robert and I think it will not be a big problem. From the
> design doc, I guess the main reason to take the Py4J way instead of the DAG
> way at present is that DAG has some limitations in some scenarios such as
> interactive programing which may be a strong requirement for data scientist.
>
> > In addition (and I'll admit this is rather subjective) it seems to me
> one of the primary values of a table-like API in a given language (vs. just
> using (say) plain old SQL itself via a console) is the ability to embed it
> in a larger pipeline, or at least drop in operations that are not (as)
> naturally expressed in the "table way," including existing libraries. In
> other words, a full SDK. The Py4j wrapping doesn't extend itself to such
> integration nearly as easily.
>
>
> Hi Robert, regarding to "a larger pipeline", do you mean translating a
> table-like API jobs from/to another kind of API job or embedding third-part
> libraries into a table-like API jobs via UDF? Could you kindly explain why
> this would be a problem for Py4J and will not be a problem if expressing
> the job with DAG?
>
> Thanks,
> Dian
>
>
> > 在 2019年4月25日,上午12:16,Robert Bradshaw <[hidden email]> 写道:
> >
> > Thanks for the meeting summary, Stephan. Sound like you covered a lot of
> ground. Some more comments below, adding onto what Max has said.
> >
> > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <[hidden email]
> <mailto:[hidden email]>> wrote:
> > >
> > > Hi Stephan,
> > >
> > > This is excited! Thanks for sharing. The inter-process communication
> > > code looks like the most natural choice as a common ground. To go
> > > further, there are indeed some challenges to solve.
> >
> > It certainly does make sense to share this work, though it does to me
> seem like a rather low level to integrate at.
> >
> > > > => Biggest question is whether the language-independent DAG is
> expressive enough to capture all the expressions that we want to map
> directly to Table API expressions. Currently much is hidden in opaque UDFs.
> Kenn mentioned the structure should be flexible enough to capture more
> expressions transparently.
> > >
> > > Just to add some context how this could be done, there is the concept
> of
> > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> > > contains a URN and with a payload. FunctionSpec can be either (1)
> > > translated by the Runner directly, e.g. map to table API concepts or
> (2)
> > > run a user-defined function with an Environment. It could be feasible
> > > for Flink to choose the direct path, whereas Beam Runners would
> leverage
> > > the more generic approach using UDFs. Granted, compatibility across
> > > Flink and Beam would only work if both of the translation paths yielded
> > > the same semantics.
> >
> > To elaborate a bit on this, Beam DAGs are built up by applying
> Transforms (basically operations) to PColections (the equivalent of
> dataset/datastream), but the key point here is that these transforms are
> often composite operations that expand out into smaller subtransforms. This
> expansion happens during pipeline construction, but with the recent work on
> cross language pipelines can happen out of process. This is one point of
> extendability. Secondly, and importantly, this composite structure is
> preserved in the DAG, and so a runner is free to ignore the provided
> expansion and supply its own (so long as semantically it produces exactly
> the same output). These composite operations can be identified by arbitrary
> URNs + payloads, and any runner that does not understand them simply uses
> the pre-provided expansion.
> >
> > The existing Flink runner operates on exactly this principle,
> translating URNs for the leaf operations (Map, Flatten, ...) as well as
> some composites it can do better (e.g. Reshard). It is intentionally easy
> to define and add new ones. This actually seems the easier approach (to me
> at least, but that's probably heavily influenced by what I'm familiar with
> vs. what I'm not).
> >
> > As for how well this maps onto the Flink Tables API, part of that
> depends on how much of the API is the operations themselves, and how much
> is concerning configuration/environment/etc. which is harder to talk about
> in an agnostic way.
> >
> > Using something like Py4j is an easy way to get up an running,
> especially for a very faithful API, but the instant one wants to add UDFs
> one hits a cliff of sorts (which is surmountable, but likely a lot harder
> than having gone the above approach). In addition (and I'll admit this is
> rather subjective) it seems to me one of the primary values of a table-like
> API in a given language (vs. just using (say) plain old SQL itself via a
> console) is the ability to embed it in a larger pipeline, or at least drop
> in operations that are not (as) naturally expressed in the "table way,"
> including existing libraries. In other words, a full SDK. The Py4j wrapping
> doesn't extend itself to such integration nearly as easily.
> >
> > But I really do understand the desire to not block immediate work (and
> value) for a longer term solution.
> >
> > > >  If the DAG is generic enough to capture the additional information,
> we probably still need some standardization, so that all the different
> language APIs represent their expressions the same way
> > >
> > > I wonder whether that's necessary as a first step. I think it would be
> > > fine for Flink to have its own way to represent API concepts in the
> Beam
> > > DAG which Beam Runners may not be able to understand. We could then
> > > successively add the capability for these transforms to run with Beam.
> > >
> > > >  Similarly, it makes sense to standardize the type system (and type
> inference) as far as built-in expressions and their interaction with UDFs
> are concerned. The Flink Table API and Blink teams found this to be
> essential for a consistent API behavior. This would not prevent all-UDF
> programs from still using purely binary/opaque types.
> > >
> > > Beam has a set of standard coders which can be used across languages.
> We
> > > will have to expand those to play well with Flink's:
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> <
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> >
> > >
> > > I think we will need to exchange more ideas to work out a model that
> > > will work for both Flink and Beam. A regular meeting could be helpful.
> >
> > +1, I think this would be really good for both this effort and general
> collaboration between the Beam and Flink communities.
> >
> > > Thanks,
> > > Max
> > >
> > > On 23.04.19 21:23, Stephan Ewen wrote:
> > > > Hi all!
> > > >
> > > > Below are my notes on the discussion last week on how to collaborate
> > > > between Beam and Flink.
> > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei,
> Shaoxuan,
> > > > Jincheng, and me.
> > > >
> > > > This represents my understanding of the discussion, please augment
> this
> > > > where I missed something or where your conclusion was different.
> > > >
> > > > Best,
> > > > Stephan
> > > >
> > > > =======================================================
> > > >
> > > > *Beams Python and Portability Framework*
> > > >
> > > >    - Portability core to Beam
> > > >    - Language independent dataflow DAG that is defined via ProtoBuf
> > > >    - DAG can be generated from various languages (Java, Python, Go)
> > > >    - The DAG describes the pipelines and contains additional
> parameters
> > > > to describe each operator, and contains artifacts that need to be
> > > > deployed / executed as part of an operator execution.
> > > >    - Operators execute in language-specific containers, data is
> > > > exchanged between the language-specific container and the runner
> > > > container (JVM) via gRPC.
> > > >
> > > > *Flink's desiderata for Python API*
> > > >
> > > >    - Python API should mirror Java / Scala Table API
> > > >    - All relational expressions that correspond to built-in functions
> > > > should be translated to corresponding expressions in the Table API.
> That
> > > > way the planner generated Java code for the data types and built-in
> > > > expressions, meaning no Python code is necessary during execution
> > > >    - UDFs should be supported and run similarly as in Beam's approach
> > > >    - Python programs should be similarly created and
> submitted/deployed
> > > > as Java / Scala programs (CLI, web, containerized, etc.)
> > > >
> > > > *Consensus to share inter-process communication code*
> > > >
> > > >    - Crucial code for robust setup and high performance data exchange
> > > > across processes
> > > >    - The code for the SDK harness, the artifact boostrapping, and the
> > > > data exchange make sense to share.
> > > >    - Ongoing discussion whether this can be a dedicated module with
> slim
> > > > dependencies in Beam
> > > >
> > > > *Potential Long Term Perspective: Share language-independent DAG
> > > > representation*
> > > >
> > > >    - Beam's language independent DAG could become a standard
> > > > representation used in both projects
> > > >    - Flink would need an way to receive that DAG, map it to the Table
> > > > API, execute it from there
> > > >    - The DAG would need to have a standardized representation of
> > > > functions and expressions that then get mapped to Table API
> expressions
> > > > to let the planner optimize those and generate Java code for those
> > > >    - Similar as UDFs are supported in the Table API, there would be
> > > > additional "external UDFs" that would go through the above mentioned
> > > > inter-process communication layer
> > > >
> > > >    - _Advantages:_
> > > >      => Flink and Beam could share more language bindings
> > > >      => Flink would execute Beam portability programs fast, without
> > > > intermediate abstraction and directly in the JVM for many operators.
> > > >           Abstraction is necessary around UDFs and to bridge between
> > > > serializers / coders, etc.
> > > >
> > > >    - _Open issues:_
> > > >      => Biggest question is whether the language-independent DAG is
> > > > expressive enough to capture all the expressions that we want to map
> > > > directly to Table API expressions. Currently much is hidden in opaque
> > > > UDFs. Kenn mentioned the structure should be flexible enough to
> capture
> > > > more expressions transparently.
> > > >
> > > >      => If the DAG is generic enough to capture the additional
> > > > information, we probably still need some standardization, so that all
> > > > the different language APIs represent their expressions the same way
> > > >      => Similarly, it makes sense to standardize the type system (and
> > > > type inference) as far as built-in expressions and their interaction
> > > > with UDFs are concerned. The Flink Table API and Blink teams found
> this
> > > > to be essential for a consistent API behavior. This would not prevent
> > > > all-UDF programs from still using purely binary/opaque types.
> > > >
> > > >   =>  We need to create a Python API that follows the same structure
> as
> > > > Flink's Table API that produces the language-independent DAG
> > > >
> > > > *Short-term approach in Flink*
> > > >
> > > >    - Goal is to not block Flink's Python effort on the long term
> > > > approach and the necessary design and evolution of the
> > > > language-independent DAG.
> > > >    - Depending on what the outcome of above investigation is, Flink
> may
> > > > initially go with a simple approach to map the Python Table API to
> the
> > > > the Java Table API via Py4J, as outlined in FLIP-38:
> > > >
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
> <
> https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

Robert Bradshaw
In reply to this post by Dian Fu-2
On Thu, Apr 25, 2019 at 5:59 AM Dian Fu <[hidden email]> wrote:
>
> Thanks everyone for the discussion here.
>
> Regarding to the Java/Scala UDF and the built-in UDF to execute in the current Flink way (directly in JVM, not via RPC), I share the same thoughts with Max and Robert and I think it will not be a big problem. From the design doc, I guess the main reason to take the Py4J way instead of the DAG way at present is that DAG has some limitations in some scenarios such as interactive programing which may be a strong requirement for data scientist.

I definitely agree that interactive is  strong requirement for the
data scientist (and others). I don't think this is incompatible with
the DAG model, and something I want to see more of. (For one
exploration, see BeamPython's (still WIP) InteractiveRunner). There
are lots of interesting challenges here (e.g. sampling, partial
results, optimal caching of results vs. re-execution, especially in
the face of fusion) that would be worth working out together.

> In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.
>
> Hi Robert, regarding to "a larger pipeline", do you mean translating a table-like API jobs from/to another kind of API job or embedding third-part libraries into a table-like API jobs via UDF? Could you kindly explain why this would be a problem for Py4J and will not be a problem if expressing the job with DAG?

I'm talking about anything one would want to do after
tableEnv.toDataSet() or before tableEnv.registerTable(...). Unless you
plan on also wrapping the DataSet/DataStream APIs too, which is a much
taller task. Let alone wrapping all the libraries one might want to
use that are built on these APIs.

If this is instead integrated at a higher level, you could swap back
and forth between the new Tables API and the existing Python SDK
(including libraries such as TFX, and cross langauge capabilities)
almost for free.

> 在 2019年4月25日,上午12:16,Robert Bradshaw <[hidden email]> 写道:
>
> Thanks for the meeting summary, Stephan. Sound like you covered a lot of ground. Some more comments below, adding onto what Max has said.
>
> On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <[hidden email]> wrote:
> >
> > Hi Stephan,
> >
> > This is excited! Thanks for sharing. The inter-process communication
> > code looks like the most natural choice as a common ground. To go
> > further, there are indeed some challenges to solve.
>
> It certainly does make sense to share this work, though it does to me seem like a rather low level to integrate at.
>
> > > => Biggest question is whether the language-independent DAG is expressive enough to capture all the expressions that we want to map directly to Table API expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure should be flexible enough to capture more expressions transparently.
> >
> > Just to add some context how this could be done, there is the concept of
> > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
> > contains a URN and with a payload. FunctionSpec can be either (1)
> > translated by the Runner directly, e.g. map to table API concepts or (2)
> > run a user-defined function with an Environment. It could be feasible
> > for Flink to choose the direct path, whereas Beam Runners would leverage
> > the more generic approach using UDFs. Granted, compatibility across
> > Flink and Beam would only work if both of the translation paths yielded
> > the same semantics.
>
> To elaborate a bit on this, Beam DAGs are built up by applying Transforms (basically operations) to PColections (the equivalent of dataset/datastream), but the key point here is that these transforms are often composite operations that expand out into smaller subtransforms. This expansion happens during pipeline construction, but with the recent work on cross language pipelines can happen out of process. This is one point of extendability. Secondly, and importantly, this composite structure is preserved in the DAG, and so a runner is free to ignore the provided expansion and supply its own (so long as semantically it produces exactly the same output). These composite operations can be identified by arbitrary URNs + payloads, and any runner that does not understand them simply uses the pre-provided expansion.
>
> The existing Flink runner operates on exactly this principle, translating URNs for the leaf operations (Map, Flatten, ...) as well as some composites it can do better (e.g. Reshard). It is intentionally easy to define and add new ones. This actually seems the easier approach (to me at least, but that's probably heavily influenced by what I'm familiar with vs. what I'm not).
>
> As for how well this maps onto the Flink Tables API, part of that depends on how much of the API is the operations themselves, and how much is concerning configuration/environment/etc. which is harder to talk about in an agnostic way.
>
> Using something like Py4j is an easy way to get up an running, especially for a very faithful API, but the instant one wants to add UDFs one hits a cliff of sorts (which is surmountable, but likely a lot harder than having gone the above approach). In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.
>
> But I really do understand the desire to not block immediate work (and value) for a longer term solution.
>
> > >  If the DAG is generic enough to capture the additional information, we probably still need some standardization, so that all the different language APIs represent their expressions the same way
> >
> > I wonder whether that's necessary as a first step. I think it would be
> > fine for Flink to have its own way to represent API concepts in the Beam
> > DAG which Beam Runners may not be able to understand. We could then
> > successively add the capability for these transforms to run with Beam.
> >
> > >  Similarly, it makes sense to standardize the type system (and type inference) as far as built-in expressions and their interaction with UDFs are concerned. The Flink Table API and Blink teams found this to be essential for a consistent API behavior. This would not prevent all-UDF programs from still using purely binary/opaque types.
> >
> > Beam has a set of standard coders which can be used across languages. We
> > will have to expand those to play well with Flink's:
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types
> >
> > I think we will need to exchange more ideas to work out a model that
> > will work for both Flink and Beam. A regular meeting could be helpful.
>
> +1, I think this would be really good for both this effort and general collaboration between the Beam and Flink communities.
>
> > Thanks,
> > Max
> >
> > On 23.04.19 21:23, Stephan Ewen wrote:
> > > Hi all!
> > >
> > > Below are my notes on the discussion last week on how to collaborate
> > > between Beam and Flink.
> > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
> > > Jincheng, and me.
> > >
> > > This represents my understanding of the discussion, please augment this
> > > where I missed something or where your conclusion was different.
> > >
> > > Best,
> > > Stephan
> > >
> > > =======================================================
> > >
> > > *Beams Python and Portability Framework*
> > >
> > >    - Portability core to Beam
> > >    - Language independent dataflow DAG that is defined via ProtoBuf
> > >    - DAG can be generated from various languages (Java, Python, Go)
> > >    - The DAG describes the pipelines and contains additional parameters
> > > to describe each operator, and contains artifacts that need to be
> > > deployed / executed as part of an operator execution.
> > >    - Operators execute in language-specific containers, data is
> > > exchanged between the language-specific container and the runner
> > > container (JVM) via gRPC.
> > >
> > > *Flink's desiderata for Python API*
> > >
> > >    - Python API should mirror Java / Scala Table API
> > >    - All relational expressions that correspond to built-in functions
> > > should be translated to corresponding expressions in the Table API. That
> > > way the planner generated Java code for the data types and built-in
> > > expressions, meaning no Python code is necessary during execution
> > >    - UDFs should be supported and run similarly as in Beam's approach
> > >    - Python programs should be similarly created and submitted/deployed
> > > as Java / Scala programs (CLI, web, containerized, etc.)
> > >
> > > *Consensus to share inter-process communication code*
> > >
> > >    - Crucial code for robust setup and high performance data exchange
> > > across processes
> > >    - The code for the SDK harness, the artifact boostrapping, and the
> > > data exchange make sense to share.
> > >    - Ongoing discussion whether this can be a dedicated module with slim
> > > dependencies in Beam
> > >
> > > *Potential Long Term Perspective: Share language-independent DAG
> > > representation*
> > >
> > >    - Beam's language independent DAG could become a standard
> > > representation used in both projects
> > >    - Flink would need an way to receive that DAG, map it to the Table
> > > API, execute it from there
> > >    - The DAG would need to have a standardized representation of
> > > functions and expressions that then get mapped to Table API expressions
> > > to let the planner optimize those and generate Java code for those
> > >    - Similar as UDFs are supported in the Table API, there would be
> > > additional "external UDFs" that would go through the above mentioned
> > > inter-process communication layer
> > >
> > >    - _Advantages:_
> > >      => Flink and Beam could share more language bindings
> > >      => Flink would execute Beam portability programs fast, without
> > > intermediate abstraction and directly in the JVM for many operators.
> > >           Abstraction is necessary around UDFs and to bridge between
> > > serializers / coders, etc.
> > >
> > >    - _Open issues:_
> > >      => Biggest question is whether the language-independent DAG is
> > > expressive enough to capture all the expressions that we want to map
> > > directly to Table API expressions. Currently much is hidden in opaque
> > > UDFs. Kenn mentioned the structure should be flexible enough to capture
> > > more expressions transparently.
> > >
> > >      => If the DAG is generic enough to capture the additional
> > > information, we probably still need some standardization, so that all
> > > the different language APIs represent their expressions the same way
> > >      => Similarly, it makes sense to standardize the type system (and
> > > type inference) as far as built-in expressions and their interaction
> > > with UDFs are concerned. The Flink Table API and Blink teams found this
> > > to be essential for a consistent API behavior. This would not prevent
> > > all-UDF programs from still using purely binary/opaque types.
> > >
> > >   =>  We need to create a Python API that follows the same structure as
> > > Flink's Table API that produces the language-independent DAG
> > >
> > > *Short-term approach in Flink*
> > >
> > >    - Goal is to not block Flink's Python effort on the long term
> > > approach and the necessary design and evolution of the
> > > language-independent DAG.
> > >    - Depending on what the outcome of above investigation is, Flink may
> > > initially go with a simple approach to map the Python Table API to the
> > > the Java Table API via Py4J, as outlined in FLIP-38:
> > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-38 Support python language in flink TableAPI

Robert Bradshaw
In reply to this post by jincheng sun
On Thu, Apr 25, 2019 at 6:04 AM jincheng sun <[hidden email]> wrote:
>
> Hi Robert,
>
> In addition to the questions described by Dian, I also want to know what difficult problems Py4j's solution will encounter in add UDF support, which you mentioned as follows:
>
>> Using something like Py4j is an easy way to get up an running, especially for a very faithful API, but the instant one wants to add UDFs one hits a cliff of sorts (which is surmountable, but likely a lot harder than having gone the above approach).
>
> I appreciate if you can share more specific cases?

The orchestration involved in supporting UDFs is non-trivial. I think
it is true that a lot of effort can be saved by re-using significant
portions of the design, concepts, and even implementation we already
have for Beam, but still re-building it out of the individual pieces
(likely necessitated due to Py4j having hooked in at lower than the
DAG level) is likely harder (initially and on-going) than simply
leveraging the complete, working package.

> Dian Fu <[hidden email]> 于2019年4月25日周四 上午11:53写道:
>>
>> Thanks everyone for the discussion here.
>>
>> Regarding to the Java/Scala UDF and the built-in UDF to execute in the current Flink way (directly in JVM, not via RPC), I share the same thoughts with Max and Robert and I think it will not be a big problem. From the design doc, I guess the main reason to take the Py4J way instead of the DAG way at present is that DAG has some limitations in some scenarios such as interactive programing which may be a strong requirement for data scientist.
>>
>> > In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.
>>
>>
>> Hi Robert, regarding to "a larger pipeline", do you mean translating a table-like API jobs from/to another kind of API job or embedding third-part libraries into a table-like API jobs via UDF? Could you kindly explain why this would be a problem for Py4J and will not be a problem if expressing the job with DAG?
>>
>> Thanks,
>> Dian
>>
>>
>> > 在 2019年4月25日,上午12:16,Robert Bradshaw <[hidden email]> 写道:
>> >
>> > Thanks for the meeting summary, Stephan. Sound like you covered a lot of ground. Some more comments below, adding onto what Max has said.
>> >
>> > On Wed, Apr 24, 2019 at 3:20 PM Maximilian Michels <[hidden email] <mailto:[hidden email]>> wrote:
>> > >
>> > > Hi Stephan,
>> > >
>> > > This is excited! Thanks for sharing. The inter-process communication
>> > > code looks like the most natural choice as a common ground. To go
>> > > further, there are indeed some challenges to solve.
>> >
>> > It certainly does make sense to share this work, though it does to me seem like a rather low level to integrate at.
>> >
>> > > > => Biggest question is whether the language-independent DAG is expressive enough to capture all the expressions that we want to map directly to Table API expressions. Currently much is hidden in opaque UDFs. Kenn mentioned the structure should be flexible enough to capture more expressions transparently.
>> > >
>> > > Just to add some context how this could be done, there is the concept of
>> > > a FunctionSpec which is part of a transform in the DAG. FunctionSpec
>> > > contains a URN and with a payload. FunctionSpec can be either (1)
>> > > translated by the Runner directly, e.g. map to table API concepts or (2)
>> > > run a user-defined function with an Environment. It could be feasible
>> > > for Flink to choose the direct path, whereas Beam Runners would leverage
>> > > the more generic approach using UDFs. Granted, compatibility across
>> > > Flink and Beam would only work if both of the translation paths yielded
>> > > the same semantics.
>> >
>> > To elaborate a bit on this, Beam DAGs are built up by applying Transforms (basically operations) to PColections (the equivalent of dataset/datastream), but the key point here is that these transforms are often composite operations that expand out into smaller subtransforms. This expansion happens during pipeline construction, but with the recent work on cross language pipelines can happen out of process. This is one point of extendability. Secondly, and importantly, this composite structure is preserved in the DAG, and so a runner is free to ignore the provided expansion and supply its own (so long as semantically it produces exactly the same output). These composite operations can be identified by arbitrary URNs + payloads, and any runner that does not understand them simply uses the pre-provided expansion.
>> >
>> > The existing Flink runner operates on exactly this principle, translating URNs for the leaf operations (Map, Flatten, ...) as well as some composites it can do better (e.g. Reshard). It is intentionally easy to define and add new ones. This actually seems the easier approach (to me at least, but that's probably heavily influenced by what I'm familiar with vs. what I'm not).
>> >
>> > As for how well this maps onto the Flink Tables API, part of that depends on how much of the API is the operations themselves, and how much is concerning configuration/environment/etc. which is harder to talk about in an agnostic way.
>> >
>> > Using something like Py4j is an easy way to get up an running, especially for a very faithful API, but the instant one wants to add UDFs one hits a cliff of sorts (which is surmountable, but likely a lot harder than having gone the above approach). In addition (and I'll admit this is rather subjective) it seems to me one of the primary values of a table-like API in a given language (vs. just using (say) plain old SQL itself via a console) is the ability to embed it in a larger pipeline, or at least drop in operations that are not (as) naturally expressed in the "table way," including existing libraries. In other words, a full SDK. The Py4j wrapping doesn't extend itself to such integration nearly as easily.
>> >
>> > But I really do understand the desire to not block immediate work (and value) for a longer term solution.
>> >
>> > > >  If the DAG is generic enough to capture the additional information, we probably still need some standardization, so that all the different language APIs represent their expressions the same way
>> > >
>> > > I wonder whether that's necessary as a first step. I think it would be
>> > > fine for Flink to have its own way to represent API concepts in the Beam
>> > > DAG which Beam Runners may not be able to understand. We could then
>> > > successively add the capability for these transforms to run with Beam.
>> > >
>> > > >  Similarly, it makes sense to standardize the type system (and type inference) as far as built-in expressions and their interaction with UDFs are concerned. The Flink Table API and Blink teams found this to be essential for a consistent API behavior. This would not prevent all-UDF programs from still using purely binary/opaque types.
>> > >
>> > > Beam has a set of standard coders which can be used across languages. We
>> > > will have to expand those to play well with Flink's:
>> > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#data-types>
>> > >
>> > > I think we will need to exchange more ideas to work out a model that
>> > > will work for both Flink and Beam. A regular meeting could be helpful.
>> >
>> > +1, I think this would be really good for both this effort and general collaboration between the Beam and Flink communities.
>> >
>> > > Thanks,
>> > > Max
>> > >
>> > > On 23.04.19 21:23, Stephan Ewen wrote:
>> > > > Hi all!
>> > > >
>> > > > Below are my notes on the discussion last week on how to collaborate
>> > > > between Beam and Flink.
>> > > > The discussion was between Tyler, Kenn, Luke, Ahmed, Xiaowei, Shaoxuan,
>> > > > Jincheng, and me.
>> > > >
>> > > > This represents my understanding of the discussion, please augment this
>> > > > where I missed something or where your conclusion was different.
>> > > >
>> > > > Best,
>> > > > Stephan
>> > > >
>> > > > =======================================================
>> > > >
>> > > > *Beams Python and Portability Framework*
>> > > >
>> > > >    - Portability core to Beam
>> > > >    - Language independent dataflow DAG that is defined via ProtoBuf
>> > > >    - DAG can be generated from various languages (Java, Python, Go)
>> > > >    - The DAG describes the pipelines and contains additional parameters
>> > > > to describe each operator, and contains artifacts that need to be
>> > > > deployed / executed as part of an operator execution.
>> > > >    - Operators execute in language-specific containers, data is
>> > > > exchanged between the language-specific container and the runner
>> > > > container (JVM) via gRPC.
>> > > >
>> > > > *Flink's desiderata for Python API*
>> > > >
>> > > >    - Python API should mirror Java / Scala Table API
>> > > >    - All relational expressions that correspond to built-in functions
>> > > > should be translated to corresponding expressions in the Table API. That
>> > > > way the planner generated Java code for the data types and built-in
>> > > > expressions, meaning no Python code is necessary during execution
>> > > >    - UDFs should be supported and run similarly as in Beam's approach
>> > > >    - Python programs should be similarly created and submitted/deployed
>> > > > as Java / Scala programs (CLI, web, containerized, etc.)
>> > > >
>> > > > *Consensus to share inter-process communication code*
>> > > >
>> > > >    - Crucial code for robust setup and high performance data exchange
>> > > > across processes
>> > > >    - The code for the SDK harness, the artifact boostrapping, and the
>> > > > data exchange make sense to share.
>> > > >    - Ongoing discussion whether this can be a dedicated module with slim
>> > > > dependencies in Beam
>> > > >
>> > > > *Potential Long Term Perspective: Share language-independent DAG
>> > > > representation*
>> > > >
>> > > >    - Beam's language independent DAG could become a standard
>> > > > representation used in both projects
>> > > >    - Flink would need an way to receive that DAG, map it to the Table
>> > > > API, execute it from there
>> > > >    - The DAG would need to have a standardized representation of
>> > > > functions and expressions that then get mapped to Table API expressions
>> > > > to let the planner optimize those and generate Java code for those
>> > > >    - Similar as UDFs are supported in the Table API, there would be
>> > > > additional "external UDFs" that would go through the above mentioned
>> > > > inter-process communication layer
>> > > >
>> > > >    - _Advantages:_
>> > > >      => Flink and Beam could share more language bindings
>> > > >      => Flink would execute Beam portability programs fast, without
>> > > > intermediate abstraction and directly in the JVM for many operators.
>> > > >           Abstraction is necessary around UDFs and to bridge between
>> > > > serializers / coders, etc.
>> > > >
>> > > >    - _Open issues:_
>> > > >      => Biggest question is whether the language-independent DAG is
>> > > > expressive enough to capture all the expressions that we want to map
>> > > > directly to Table API expressions. Currently much is hidden in opaque
>> > > > UDFs. Kenn mentioned the structure should be flexible enough to capture
>> > > > more expressions transparently.
>> > > >
>> > > >      => If the DAG is generic enough to capture the additional
>> > > > information, we probably still need some standardization, so that all
>> > > > the different language APIs represent their expressions the same way
>> > > >      => Similarly, it makes sense to standardize the type system (and
>> > > > type inference) as far as built-in expressions and their interaction
>> > > > with UDFs are concerned. The Flink Table API and Blink teams found this
>> > > > to be essential for a consistent API behavior. This would not prevent
>> > > > all-UDF programs from still using purely binary/opaque types.
>> > > >
>> > > >   =>  We need to create a Python API that follows the same structure as
>> > > > Flink's Table API that produces the language-independent DAG
>> > > >
>> > > > *Short-term approach in Flink*
>> > > >
>> > > >    - Goal is to not block Flink's Python effort on the long term
>> > > > approach and the necessary design and evolution of the
>> > > > language-independent DAG.
>> > > >    - Depending on what the outcome of above investigation is, Flink may
>> > > > initially go with a simple approach to map the Python Table API to the
>> > > > the Java Table API via Py4J, as outlined in FLIP-38:
>> > > > https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8 <https://docs.google.com/document/d/1ybYt-0xWRMa1Yf5VsuqGRtOfJBz4p74ZmDxZYg3j_h8>
12