Configuring UDFs with user defined parameters

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Configuring UDFs with user defined parameters

Matthias J. Sax-2
Hi,

I observed, that DataSet API offers a nice way to configure
UDF-Operators by providing the method ".withParameters()". However,
Streaming API does not offer such a method.

For a current PR (https://github.com/apache/flink/pull/1046) this
feature would be very helpful.

As a workaround, PR #1046 can also be finished using JobConfiguration.
However, this seems to be somewhat unnatural. Furthermore, I think that
this feature would be nice to have in general. What do you think about it?

If we introduce this feature, we can either open a new JIRA of just
include it into the current PR #1046. What would be the better way?


-Matthias


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Aljoscha Krettek-2
Hi,
I think the possibility to use a Configuration object is a legacy from the
past where the API was a bit closer to how Hadoop works. In my opinion this
is not necessary anymore since User Code objects can just contain
configuration settings in fields.

The feature for the Storm API could probably be implemented by just storing
a Configuration object in the user code function.

Regards,
Aljoscha

On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:

> Hi,
>
> I observed, that DataSet API offers a nice way to configure
> UDF-Operators by providing the method ".withParameters()". However,
> Streaming API does not offer such a method.
>
> For a current PR (https://github.com/apache/flink/pull/1046) this
> feature would be very helpful.
>
> As a workaround, PR #1046 can also be finished using JobConfiguration.
> However, this seems to be somewhat unnatural. Furthermore, I think that
> this feature would be nice to have in general. What do you think about it?
>
> If we introduce this feature, we can either open a new JIRA of just
> include it into the current PR #1046. What would be the better way?
>
>
> -Matthias
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Matthias J. Sax-2
Thanks for the input.

However, I doubt that a member variable approach is feasible, because
when the Storm topology is translated into a Flink program (in
`FlinkBuilder.createTopology()`) the Storm configuration is not
available yet. And adding the configuration later to each operator would
be cumbersome.

If there are no better ideas, I guess the current usage of
JobConfiguration is the best way to handle it (because extending
TaskConfiguration seems to be no option)

-Matthias

On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:

> Hi,
> I think the possibility to use a Configuration object is a legacy from the
> past where the API was a bit closer to how Hadoop works. In my opinion this
> is not necessary anymore since User Code objects can just contain
> configuration settings in fields.
>
> The feature for the Storm API could probably be implemented by just storing
> a Configuration object in the user code function.
>
> Regards,
> Aljoscha
>
> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:
>
>> Hi,
>>
>> I observed, that DataSet API offers a nice way to configure
>> UDF-Operators by providing the method ".withParameters()". However,
>> Streaming API does not offer such a method.
>>
>> For a current PR (https://github.com/apache/flink/pull/1046) this
>> feature would be very helpful.
>>
>> As a workaround, PR #1046 can also be finished using JobConfiguration.
>> However, this seems to be somewhat unnatural. Furthermore, I think that
>> this feature would be nice to have in general. What do you think about it?
>>
>> If we introduce this feature, we can either open a new JIRA of just
>> include it into the current PR #1046. What would be the better way?
>>
>>
>> -Matthias
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Stephan Ewen
The JobConfig is a system level config. Would be nice to not expose them to
the user-level unless necessary.

What about using the ExecutionConfig, where you can add shared user-level
parameters?

On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <[hidden email]> wrote:

> Thanks for the input.
>
> However, I doubt that a member variable approach is feasible, because
> when the Storm topology is translated into a Flink program (in
> `FlinkBuilder.createTopology()`) the Storm configuration is not
> available yet. And adding the configuration later to each operator would
> be cumbersome.
>
> If there are no better ideas, I guess the current usage of
> JobConfiguration is the best way to handle it (because extending
> TaskConfiguration seems to be no option)
>
> -Matthias
>
> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
> > Hi,
> > I think the possibility to use a Configuration object is a legacy from
> the
> > past where the API was a bit closer to how Hadoop works. In my opinion
> this
> > is not necessary anymore since User Code objects can just contain
> > configuration settings in fields.
> >
> > The feature for the Storm API could probably be implemented by just
> storing
> > a Configuration object in the user code function.
> >
> > Regards,
> > Aljoscha
> >
> > On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:
> >
> >> Hi,
> >>
> >> I observed, that DataSet API offers a nice way to configure
> >> UDF-Operators by providing the method ".withParameters()". However,
> >> Streaming API does not offer such a method.
> >>
> >> For a current PR (https://github.com/apache/flink/pull/1046) this
> >> feature would be very helpful.
> >>
> >> As a workaround, PR #1046 can also be finished using JobConfiguration.
> >> However, this seems to be somewhat unnatural. Furthermore, I think that
> >> this feature would be nice to have in general. What do you think about
> it?
> >>
> >> If we introduce this feature, we can either open a new JIRA of just
> >> include it into the current PR #1046. What would be the better way?
> >>
> >>
> >> -Matthias
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Matthias J. Sax-2
It might sound stupid. But how could such a configuration be set?

StreamExecutionEnvironment only offerst ".getConfig()"

-Matthias

On 09/07/2015 03:05 PM, Stephan Ewen wrote:

> The JobConfig is a system level config. Would be nice to not expose them to
> the user-level unless necessary.
>
> What about using the ExecutionConfig, where you can add shared user-level
> parameters?
>
> On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <[hidden email]> wrote:
>
>> Thanks for the input.
>>
>> However, I doubt that a member variable approach is feasible, because
>> when the Storm topology is translated into a Flink program (in
>> `FlinkBuilder.createTopology()`) the Storm configuration is not
>> available yet. And adding the configuration later to each operator would
>> be cumbersome.
>>
>> If there are no better ideas, I guess the current usage of
>> JobConfiguration is the best way to handle it (because extending
>> TaskConfiguration seems to be no option)
>>
>> -Matthias
>>
>> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
>>> Hi,
>>> I think the possibility to use a Configuration object is a legacy from
>> the
>>> past where the API was a bit closer to how Hadoop works. In my opinion
>> this
>>> is not necessary anymore since User Code objects can just contain
>>> configuration settings in fields.
>>>
>>> The feature for the Storm API could probably be implemented by just
>> storing
>>> a Configuration object in the user code function.
>>>
>>> Regards,
>>> Aljoscha
>>>
>>> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:
>>>
>>>> Hi,
>>>>
>>>> I observed, that DataSet API offers a nice way to configure
>>>> UDF-Operators by providing the method ".withParameters()". However,
>>>> Streaming API does not offer such a method.
>>>>
>>>> For a current PR (https://github.com/apache/flink/pull/1046) this
>>>> feature would be very helpful.
>>>>
>>>> As a workaround, PR #1046 can also be finished using JobConfiguration.
>>>> However, this seems to be somewhat unnatural. Furthermore, I think that
>>>> this feature would be nice to have in general. What do you think about
>> it?
>>>>
>>>> If we introduce this feature, we can either open a new JIRA of just
>>>> include it into the current PR #1046. What would be the better way?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Fabian Hueske-2
Ah, here's the discussion I was looking for :-)
I think Stephan refers to ExecutionConfig.setGlobalJobParameters().

2015-09-15 0:25 GMT+02:00 Matthias J. Sax <[hidden email]>:

> It might sound stupid. But how could such a configuration be set?
>
> StreamExecutionEnvironment only offerst ".getConfig()"
>
> -Matthias
>
> On 09/07/2015 03:05 PM, Stephan Ewen wrote:
> > The JobConfig is a system level config. Would be nice to not expose them
> to
> > the user-level unless necessary.
> >
> > What about using the ExecutionConfig, where you can add shared user-level
> > parameters?
> >
> > On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <[hidden email]>
> wrote:
> >
> >> Thanks for the input.
> >>
> >> However, I doubt that a member variable approach is feasible, because
> >> when the Storm topology is translated into a Flink program (in
> >> `FlinkBuilder.createTopology()`) the Storm configuration is not
> >> available yet. And adding the configuration later to each operator would
> >> be cumbersome.
> >>
> >> If there are no better ideas, I guess the current usage of
> >> JobConfiguration is the best way to handle it (because extending
> >> TaskConfiguration seems to be no option)
> >>
> >> -Matthias
> >>
> >> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
> >>> Hi,
> >>> I think the possibility to use a Configuration object is a legacy from
> >> the
> >>> past where the API was a bit closer to how Hadoop works. In my opinion
> >> this
> >>> is not necessary anymore since User Code objects can just contain
> >>> configuration settings in fields.
> >>>
> >>> The feature for the Storm API could probably be implemented by just
> >> storing
> >>> a Configuration object in the user code function.
> >>>
> >>> Regards,
> >>> Aljoscha
> >>>
> >>> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I observed, that DataSet API offers a nice way to configure
> >>>> UDF-Operators by providing the method ".withParameters()". However,
> >>>> Streaming API does not offer such a method.
> >>>>
> >>>> For a current PR (https://github.com/apache/flink/pull/1046) this
> >>>> feature would be very helpful.
> >>>>
> >>>> As a workaround, PR #1046 can also be finished using JobConfiguration.
> >>>> However, this seems to be somewhat unnatural. Furthermore, I think
> that
> >>>> this feature would be nice to have in general. What do you think about
> >> it?
> >>>>
> >>>> If we introduce this feature, we can either open a new JIRA of just
> >>>> include it into the current PR #1046. What would be the better way?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Matthias J. Sax-2
Thanks, now I understand how to do it:

We just use
> env.getConfig().setGlobalJobParameters(new StromConfig());

with "StormConfig extends GlobalJobParameters"

We can access this configuration in SourceFunction via
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()

and in StreamOperator via
> executionConfig.getGlobalJobParameters()

-Matthias


On 09/15/2015 12:31 AM, Fabian Hueske wrote:

> Ah, here's the discussion I was looking for :-)
> I think Stephan refers to ExecutionConfig.setGlobalJobParameters().
>
> 2015-09-15 0:25 GMT+02:00 Matthias J. Sax <[hidden email]>:
>
>> It might sound stupid. But how could such a configuration be set?
>>
>> StreamExecutionEnvironment only offerst ".getConfig()"
>>
>> -Matthias
>>
>> On 09/07/2015 03:05 PM, Stephan Ewen wrote:
>>> The JobConfig is a system level config. Would be nice to not expose them
>> to
>>> the user-level unless necessary.
>>>
>>> What about using the ExecutionConfig, where you can add shared user-level
>>> parameters?
>>>
>>> On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <[hidden email]>
>> wrote:
>>>
>>>> Thanks for the input.
>>>>
>>>> However, I doubt that a member variable approach is feasible, because
>>>> when the Storm topology is translated into a Flink program (in
>>>> `FlinkBuilder.createTopology()`) the Storm configuration is not
>>>> available yet. And adding the configuration later to each operator would
>>>> be cumbersome.
>>>>
>>>> If there are no better ideas, I guess the current usage of
>>>> JobConfiguration is the best way to handle it (because extending
>>>> TaskConfiguration seems to be no option)
>>>>
>>>> -Matthias
>>>>
>>>> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
>>>>> Hi,
>>>>> I think the possibility to use a Configuration object is a legacy from
>>>> the
>>>>> past where the API was a bit closer to how Hadoop works. In my opinion
>>>> this
>>>>> is not necessary anymore since User Code objects can just contain
>>>>> configuration settings in fields.
>>>>>
>>>>> The feature for the Storm API could probably be implemented by just
>>>> storing
>>>>> a Configuration object in the user code function.
>>>>>
>>>>> Regards,
>>>>> Aljoscha
>>>>>
>>>>> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I observed, that DataSet API offers a nice way to configure
>>>>>> UDF-Operators by providing the method ".withParameters()". However,
>>>>>> Streaming API does not offer such a method.
>>>>>>
>>>>>> For a current PR (https://github.com/apache/flink/pull/1046) this
>>>>>> feature would be very helpful.
>>>>>>
>>>>>> As a workaround, PR #1046 can also be finished using JobConfiguration.
>>>>>> However, this seems to be somewhat unnatural. Furthermore, I think
>> that
>>>>>> this feature would be nice to have in general. What do you think about
>>>> it?
>>>>>>
>>>>>> If we introduce this feature, we can either open a new JIRA of just
>>>>>> include it into the current PR #1046. What would be the better way?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Configuring UDFs with user defined parameters

Stephan Ewen
Yes, exactly...

On Tue, Sep 15, 2015 at 1:15 AM, Matthias J. Sax <[hidden email]> wrote:

> Thanks, now I understand how to do it:
>
> We just use
> > env.getConfig().setGlobalJobParameters(new StromConfig());
>
> with "StormConfig extends GlobalJobParameters"
>
> We can access this configuration in SourceFunction via
> > getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>
> and in StreamOperator via
> > executionConfig.getGlobalJobParameters()
>
> -Matthias
>
>
> On 09/15/2015 12:31 AM, Fabian Hueske wrote:
> > Ah, here's the discussion I was looking for :-)
> > I think Stephan refers to ExecutionConfig.setGlobalJobParameters().
> >
> > 2015-09-15 0:25 GMT+02:00 Matthias J. Sax <[hidden email]>:
> >
> >> It might sound stupid. But how could such a configuration be set?
> >>
> >> StreamExecutionEnvironment only offerst ".getConfig()"
> >>
> >> -Matthias
> >>
> >> On 09/07/2015 03:05 PM, Stephan Ewen wrote:
> >>> The JobConfig is a system level config. Would be nice to not expose
> them
> >> to
> >>> the user-level unless necessary.
> >>>
> >>> What about using the ExecutionConfig, where you can add shared
> user-level
> >>> parameters?
> >>>
> >>> On Mon, Sep 7, 2015 at 1:39 PM, Matthias J. Sax <[hidden email]>
> >> wrote:
> >>>
> >>>> Thanks for the input.
> >>>>
> >>>> However, I doubt that a member variable approach is feasible, because
> >>>> when the Storm topology is translated into a Flink program (in
> >>>> `FlinkBuilder.createTopology()`) the Storm configuration is not
> >>>> available yet. And adding the configuration later to each operator
> would
> >>>> be cumbersome.
> >>>>
> >>>> If there are no better ideas, I guess the current usage of
> >>>> JobConfiguration is the best way to handle it (because extending
> >>>> TaskConfiguration seems to be no option)
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 09/06/2015 10:51 PM, Aljoscha Krettek wrote:
> >>>>> Hi,
> >>>>> I think the possibility to use a Configuration object is a legacy
> from
> >>>> the
> >>>>> past where the API was a bit closer to how Hadoop works. In my
> opinion
> >>>> this
> >>>>> is not necessary anymore since User Code objects can just contain
> >>>>> configuration settings in fields.
> >>>>>
> >>>>> The feature for the Storm API could probably be implemented by just
> >>>> storing
> >>>>> a Configuration object in the user code function.
> >>>>>
> >>>>> Regards,
> >>>>> Aljoscha
> >>>>>
> >>>>> On Sun, 6 Sep 2015 at 18:29 Matthias J. Sax <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I observed, that DataSet API offers a nice way to configure
> >>>>>> UDF-Operators by providing the method ".withParameters()". However,
> >>>>>> Streaming API does not offer such a method.
> >>>>>>
> >>>>>> For a current PR (https://github.com/apache/flink/pull/1046) this
> >>>>>> feature would be very helpful.
> >>>>>>
> >>>>>> As a workaround, PR #1046 can also be finished using
> JobConfiguration.
> >>>>>> However, this seems to be somewhat unnatural. Furthermore, I think
> >> that
> >>>>>> this feature would be nice to have in general. What do you think
> about
> >>>> it?
> >>>>>>
> >>>>>> If we introduce this feature, we can either open a new JIRA of just
> >>>>>> include it into the current PR #1046. What would be the better way?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>