FLINK-3750 (JDBCInputFormat)

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
Hi guys,

I'm integrating the comments of Chesnay to my PR but there's a couple of
thing that I'd like to discuss with the core developers.


   1. about the JDBC type mapping (addValue() method at [1]: At the moment
   if I find a null value for a  Double, the getDouble of jdbc return 0.0. Is
   it really the correct behaviour? Wouldn't be better to use a POJO or the
   Row of datatable that can handle void? Moreover, the mapping between SQL
   type and Java types varies much from the single JDBC implementation.
   Wouldn't be better to rely on the Java type coming from using
   resultSet.getObject() to get such a mapping rather than using the
   ResultSetMetadata types?
   2. I'd like to handle connections very efficiently because we have a use
   case with billions of records and thus millions of splits and establish a
   new connection each time could be expensive. Would it be a problem to add
   apache pool dependency to the jdbc batch connector in order to reuase the
   created connections?


[1]
https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Fabian Hueske-2
Hi Flavio,

that are good questions.

1) Replacing null values by default values and simply forwarding records is
very dangerous, in my opinion.
I see two alternatives: A) we use a data type that tolerates null values.
This could be a POJO that the user has to provide or Row. The drawback of
Row is that it is untyped and not easy to handle. B) We use Tuple and add
an additional field that holds an Integer which serves as a bitset to mark
null fields. This would be a pretty low level API though. I am leaning
towards the user-provided POJO option.

2) The JDBCInputFormat is located in a dedicated Maven module. I think we
can add a dependency to that module. However, it should also be possible to
reuse the same connection of an InputFormat across InputSplits, i.e., calls
of the open() method. Wouldn't that be sufficient?

Best, Fabian

2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> Hi guys,
>
> I'm integrating the comments of Chesnay to my PR but there's a couple of
> thing that I'd like to discuss with the core developers.
>
>
>    1. about the JDBC type mapping (addValue() method at [1]: At the moment
>    if I find a null value for a  Double, the getDouble of jdbc return 0.0.
> Is
>    it really the correct behaviour? Wouldn't be better to use a POJO or the
>    Row of datatable that can handle void? Moreover, the mapping between SQL
>    type and Java types varies much from the single JDBC implementation.
>    Wouldn't be better to rely on the Java type coming from using
>    resultSet.getObject() to get such a mapping rather than using the
>    ResultSetMetadata types?
>    2. I'd like to handle connections very efficiently because we have a use
>    case with billions of records and thus millions of splits and establish
> a
>    new connection each time could be expensive. Would it be a problem to
> add
>    apache pool dependency to the jdbc batch connector in order to reuase
> the
>    created connections?
>
>
> [1]
>
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Chesnay Schepler-3
On 14.04.2016 17:22, Fabian Hueske wrote:

> Hi Flavio,
>
> that are good questions.
>
> 1) Replacing null values by default values and simply forwarding records is
> very dangerous, in my opinion.
> I see two alternatives: A) we use a data type that tolerates null values.
> This could be a POJO that the user has to provide or Row. The drawback of
> Row is that it is untyped and not easy to handle. B) We use Tuple and add
> an additional field that holds an Integer which serves as a bitset to mark
> null fields. This would be a pretty low level API though. I am leaning
> towards the user-provided POJO option.
i would also lean towards the POJO option.
>
> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we
> can add a dependency to that module. However, it should also be possible to
> reuse the same connection of an InputFormat across InputSplits, i.e., calls
> of the open() method. Wouldn't that be sufficient?
this is the right approach imo.

> Best, Fabian
>
> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>
>> Hi guys,
>>
>> I'm integrating the comments of Chesnay to my PR but there's a couple of
>> thing that I'd like to discuss with the core developers.
>>
>>
>>     1. about the JDBC type mapping (addValue() method at [1]: At the moment
>>     if I find a null value for a  Double, the getDouble of jdbc return 0.0.
>> Is
>>     it really the correct behaviour? Wouldn't be better to use a POJO or the
>>     Row of datatable that can handle void? Moreover, the mapping between SQL
>>     type and Java types varies much from the single JDBC implementation.
>>     Wouldn't be better to rely on the Java type coming from using
>>     resultSet.getObject() to get such a mapping rather than using the
>>     ResultSetMetadata types?
>>     2. I'd like to handle connections very efficiently because we have a use
>>     case with billions of records and thus millions of splits and establish
>> a
>>     new connection each time could be expensive. Would it be a problem to
>> add
>>     apache pool dependency to the jdbc batch connector in order to reuase
>> the
>>     created connections?
>>
>>
>> [1]
>>
>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>>

Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
I didn't understand what you mean for "it should also be possible to reuse
the same connection of an InputFormat across InputSplits, i.e., calls of
the open() method".
At the moment in the open method there's a call to establishConnection,
thus, a new connection is created for each split.
If I understood correctly, you're suggesting to create a pool in the
inputFormat and simply call poo.borrow() in the open() rather than
establishConnection?

On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:

> On 14.04.2016 17:22, Fabian Hueske wrote:
>
>> Hi Flavio,
>>
>> that are good questions.
>>
>> 1) Replacing null values by default values and simply forwarding records
>> is
>> very dangerous, in my opinion.
>> I see two alternatives: A) we use a data type that tolerates null values.
>> This could be a POJO that the user has to provide or Row. The drawback of
>> Row is that it is untyped and not easy to handle. B) We use Tuple and add
>> an additional field that holds an Integer which serves as a bitset to mark
>> null fields. This would be a pretty low level API though. I am leaning
>> towards the user-provided POJO option.
>>
> i would also lean towards the POJO option.
>
>>
>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we
>> can add a dependency to that module. However, it should also be possible
>> to
>> reuse the same connection of an InputFormat across InputSplits, i.e.,
>> calls
>> of the open() method. Wouldn't that be sufficient?
>>
> this is the right approach imo.
>
>> Best, Fabian
>>
>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>>
>> Hi guys,
>>>
>>> I'm integrating the comments of Chesnay to my PR but there's a couple of
>>> thing that I'd like to discuss with the core developers.
>>>
>>>
>>>     1. about the JDBC type mapping (addValue() method at [1]: At the
>>> moment
>>>     if I find a null value for a  Double, the getDouble of jdbc return
>>> 0.0.
>>> Is
>>>     it really the correct behaviour? Wouldn't be better to use a POJO or
>>> the
>>>     Row of datatable that can handle void? Moreover, the mapping between
>>> SQL
>>>     type and Java types varies much from the single JDBC implementation.
>>>     Wouldn't be better to rely on the Java type coming from using
>>>     resultSet.getObject() to get such a mapping rather than using the
>>>     ResultSetMetadata types?
>>>     2. I'd like to handle connections very efficiently because we have a
>>> use
>>>     case with billions of records and thus millions of splits and
>>> establish
>>> a
>>>     new connection each time could be expensive. Would it be a problem to
>>> add
>>>     apache pool dependency to the jdbc batch connector in order to reuase
>>> the
>>>     created connections?
>>>
>>>
>>> [1]
>>>
>>>
>>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Fabian Hueske-2
An InputFormat object processes several InputSplits, so open() is
repeatedly called on the same object.
I suggest to create the connection in the first open() call and reuse it in
all subsequent open() calls.

So no pool at all ;-)

2016-04-14 17:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> I didn't understand what you mean for "it should also be possible to reuse
> the same connection of an InputFormat across InputSplits, i.e., calls of
> the open() method".
> At the moment in the open method there's a call to establishConnection,
> thus, a new connection is created for each split.
> If I understood correctly, you're suggesting to create a pool in the
> inputFormat and simply call poo.borrow() in the open() rather than
> establishConnection?
>
> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:
>
> > On 14.04.2016 17:22, Fabian Hueske wrote:
> >
> >> Hi Flavio,
> >>
> >> that are good questions.
> >>
> >> 1) Replacing null values by default values and simply forwarding records
> >> is
> >> very dangerous, in my opinion.
> >> I see two alternatives: A) we use a data type that tolerates null
> values.
> >> This could be a POJO that the user has to provide or Row. The drawback
> of
> >> Row is that it is untyped and not easy to handle. B) We use Tuple and
> add
> >> an additional field that holds an Integer which serves as a bitset to
> mark
> >> null fields. This would be a pretty low level API though. I am leaning
> >> towards the user-provided POJO option.
> >>
> > i would also lean towards the POJO option.
> >
> >>
> >> 2) The JDBCInputFormat is located in a dedicated Maven module. I think
> we
> >> can add a dependency to that module. However, it should also be possible
> >> to
> >> reuse the same connection of an InputFormat across InputSplits, i.e.,
> >> calls
> >> of the open() method. Wouldn't that be sufficient?
> >>
> > this is the right approach imo.
> >
> >> Best, Fabian
> >>
> >> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >>
> >> Hi guys,
> >>>
> >>> I'm integrating the comments of Chesnay to my PR but there's a couple
> of
> >>> thing that I'd like to discuss with the core developers.
> >>>
> >>>
> >>>     1. about the JDBC type mapping (addValue() method at [1]: At the
> >>> moment
> >>>     if I find a null value for a  Double, the getDouble of jdbc return
> >>> 0.0.
> >>> Is
> >>>     it really the correct behaviour? Wouldn't be better to use a POJO
> or
> >>> the
> >>>     Row of datatable that can handle void? Moreover, the mapping
> between
> >>> SQL
> >>>     type and Java types varies much from the single JDBC
> implementation.
> >>>     Wouldn't be better to rely on the Java type coming from using
> >>>     resultSet.getObject() to get such a mapping rather than using the
> >>>     ResultSetMetadata types?
> >>>     2. I'd like to handle connections very efficiently because we have
> a
> >>> use
> >>>     case with billions of records and thus millions of splits and
> >>> establish
> >>> a
> >>>     new connection each time could be expensive. Would it be a problem
> to
> >>> add
> >>>     apache pool dependency to the jdbc batch connector in order to
> reuase
> >>> the
> >>>     created connections?
> >>>
> >>>
> >>> [1]
> >>>
> >>>
> >>>
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> >>>
> >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Chesnay Schepler-3
In reply to this post by Flavio Pompermaier
no.

if (connection==null) {
  establishCOnnection();
}

done. same connection for all splits.

On 14.04.2016 17:59, Flavio Pompermaier wrote:

> I didn't understand what you mean for "it should also be possible to reuse
> the same connection of an InputFormat across InputSplits, i.e., calls of
> the open() method".
> At the moment in the open method there's a call to establishConnection,
> thus, a new connection is created for each split.
> If I understood correctly, you're suggesting to create a pool in the
> inputFormat and simply call poo.borrow() in the open() rather than
> establishConnection?
>
> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:
>
>> On 14.04.2016 17:22, Fabian Hueske wrote:
>>
>>> Hi Flavio,
>>>
>>> that are good questions.
>>>
>>> 1) Replacing null values by default values and simply forwarding records
>>> is
>>> very dangerous, in my opinion.
>>> I see two alternatives: A) we use a data type that tolerates null values.
>>> This could be a POJO that the user has to provide or Row. The drawback of
>>> Row is that it is untyped and not easy to handle. B) We use Tuple and add
>>> an additional field that holds an Integer which serves as a bitset to mark
>>> null fields. This would be a pretty low level API though. I am leaning
>>> towards the user-provided POJO option.
>>>
>> i would also lean towards the POJO option.
>>
>>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we
>>> can add a dependency to that module. However, it should also be possible
>>> to
>>> reuse the same connection of an InputFormat across InputSplits, i.e.,
>>> calls
>>> of the open() method. Wouldn't that be sufficient?
>>>
>> this is the right approach imo.
>>
>>> Best, Fabian
>>>
>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>>>
>>> Hi guys,
>>>> I'm integrating the comments of Chesnay to my PR but there's a couple of
>>>> thing that I'd like to discuss with the core developers.
>>>>
>>>>
>>>>      1. about the JDBC type mapping (addValue() method at [1]: At the
>>>> moment
>>>>      if I find a null value for a  Double, the getDouble of jdbc return
>>>> 0.0.
>>>> Is
>>>>      it really the correct behaviour? Wouldn't be better to use a POJO or
>>>> the
>>>>      Row of datatable that can handle void? Moreover, the mapping between
>>>> SQL
>>>>      type and Java types varies much from the single JDBC implementation.
>>>>      Wouldn't be better to rely on the Java type coming from using
>>>>      resultSet.getObject() to get such a mapping rather than using the
>>>>      ResultSetMetadata types?
>>>>      2. I'd like to handle connections very efficiently because we have a
>>>> use
>>>>      case with billions of records and thus millions of splits and
>>>> establish
>>>> a
>>>>      new connection each time could be expensive. Would it be a problem to
>>>> add
>>>>      apache pool dependency to the jdbc batch connector in order to reuase
>>>> the
>>>>      created connections?
>>>>
>>>>
>>>> [1]
>>>>
>>>>
>>>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>>>>
>>>>

Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
ok thanks!just one last question: an inputformat is instantiated for each
task slot or once for task manger?
On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote:

> no.
>
> if (connection==null) {
>  establishCOnnection();
> }
>
> done. same connection for all splits.
>
> On 14.04.2016 17:59, Flavio Pompermaier wrote:
>
>> I didn't understand what you mean for "it should also be possible to reuse
>> the same connection of an InputFormat across InputSplits, i.e., calls of
>> the open() method".
>> At the moment in the open method there's a call to establishConnection,
>> thus, a new connection is created for each split.
>> If I understood correctly, you're suggesting to create a pool in the
>> inputFormat and simply call poo.borrow() in the open() rather than
>> establishConnection?
>>
>> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:
>>
>> On 14.04.2016 17:22, Fabian Hueske wrote:
>>>
>>> Hi Flavio,
>>>>
>>>> that are good questions.
>>>>
>>>> 1) Replacing null values by default values and simply forwarding records
>>>> is
>>>> very dangerous, in my opinion.
>>>> I see two alternatives: A) we use a data type that tolerates null
>>>> values.
>>>> This could be a POJO that the user has to provide or Row. The drawback
>>>> of
>>>> Row is that it is untyped and not easy to handle. B) We use Tuple and
>>>> add
>>>> an additional field that holds an Integer which serves as a bitset to
>>>> mark
>>>> null fields. This would be a pretty low level API though. I am leaning
>>>> towards the user-provided POJO option.
>>>>
>>>> i would also lean towards the POJO option.
>>>
>>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we
>>>> can add a dependency to that module. However, it should also be possible
>>>> to
>>>> reuse the same connection of an InputFormat across InputSplits, i.e.,
>>>> calls
>>>> of the open() method. Wouldn't that be sufficient?
>>>>
>>>> this is the right approach imo.
>>>
>>> Best, Fabian
>>>>
>>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>>>>
>>>> Hi guys,
>>>>
>>>>> I'm integrating the comments of Chesnay to my PR but there's a couple
>>>>> of
>>>>> thing that I'd like to discuss with the core developers.
>>>>>
>>>>>
>>>>>      1. about the JDBC type mapping (addValue() method at [1]: At the
>>>>> moment
>>>>>      if I find a null value for a  Double, the getDouble of jdbc return
>>>>> 0.0.
>>>>> Is
>>>>>      it really the correct behaviour? Wouldn't be better to use a POJO
>>>>> or
>>>>> the
>>>>>      Row of datatable that can handle void? Moreover, the mapping
>>>>> between
>>>>> SQL
>>>>>      type and Java types varies much from the single JDBC
>>>>> implementation.
>>>>>      Wouldn't be better to rely on the Java type coming from using
>>>>>      resultSet.getObject() to get such a mapping rather than using the
>>>>>      ResultSetMetadata types?
>>>>>      2. I'd like to handle connections very efficiently because we
>>>>> have a
>>>>> use
>>>>>      case with billions of records and thus millions of splits and
>>>>> establish
>>>>> a
>>>>>      new connection each time could be expensive. Would it be a
>>>>> problem to
>>>>> add
>>>>>      apache pool dependency to the jdbc batch connector in order to
>>>>> reuase
>>>>> the
>>>>>      created connections?
>>>>>
>>>>>
>>>>> [1]
>>>>>
>>>>>
>>>>>
>>>>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>>>>>
>>>>>
>>>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Fabian Hueske-2
There is an InputFormat object for each parallel task of a DataSource.
So for a source with parallelism 8 you will have 8 instances of the
InputFormat running, regardless whether this is on one box with 8 slots or
8 machines with 1 slots each.
The same is true for all other operators (Map, Reduce, Join, etc.) and
DataSinks.

Note, a single task does not fill a slot, but a "slice" of the program (one
parallel task of each operator) fills a slot.

Cheers, Fabian

2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> ok thanks!just one last question: an inputformat is instantiated for each
> task slot or once for task manger?
> On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote:
>
> > no.
> >
> > if (connection==null) {
> >  establishCOnnection();
> > }
> >
> > done. same connection for all splits.
> >
> > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> >
> >> I didn't understand what you mean for "it should also be possible to
> reuse
> >> the same connection of an InputFormat across InputSplits, i.e., calls of
> >> the open() method".
> >> At the moment in the open method there's a call to establishConnection,
> >> thus, a new connection is created for each split.
> >> If I understood correctly, you're suggesting to create a pool in the
> >> inputFormat and simply call poo.borrow() in the open() rather than
> >> establishConnection?
> >>
> >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:
> >>
> >> On 14.04.2016 17:22, Fabian Hueske wrote:
> >>>
> >>> Hi Flavio,
> >>>>
> >>>> that are good questions.
> >>>>
> >>>> 1) Replacing null values by default values and simply forwarding
> records
> >>>> is
> >>>> very dangerous, in my opinion.
> >>>> I see two alternatives: A) we use a data type that tolerates null
> >>>> values.
> >>>> This could be a POJO that the user has to provide or Row. The drawback
> >>>> of
> >>>> Row is that it is untyped and not easy to handle. B) We use Tuple and
> >>>> add
> >>>> an additional field that holds an Integer which serves as a bitset to
> >>>> mark
> >>>> null fields. This would be a pretty low level API though. I am leaning
> >>>> towards the user-provided POJO option.
> >>>>
> >>>> i would also lean towards the POJO option.
> >>>
> >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think
> we
> >>>> can add a dependency to that module. However, it should also be
> possible
> >>>> to
> >>>> reuse the same connection of an InputFormat across InputSplits, i.e.,
> >>>> calls
> >>>> of the open() method. Wouldn't that be sufficient?
> >>>>
> >>>> this is the right approach imo.
> >>>
> >>> Best, Fabian
> >>>>
> >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >>>>
> >>>> Hi guys,
> >>>>
> >>>>> I'm integrating the comments of Chesnay to my PR but there's a couple
> >>>>> of
> >>>>> thing that I'd like to discuss with the core developers.
> >>>>>
> >>>>>
> >>>>>      1. about the JDBC type mapping (addValue() method at [1]: At the
> >>>>> moment
> >>>>>      if I find a null value for a  Double, the getDouble of jdbc
> return
> >>>>> 0.0.
> >>>>> Is
> >>>>>      it really the correct behaviour? Wouldn't be better to use a
> POJO
> >>>>> or
> >>>>> the
> >>>>>      Row of datatable that can handle void? Moreover, the mapping
> >>>>> between
> >>>>> SQL
> >>>>>      type and Java types varies much from the single JDBC
> >>>>> implementation.
> >>>>>      Wouldn't be better to rely on the Java type coming from using
> >>>>>      resultSet.getObject() to get such a mapping rather than using
> the
> >>>>>      ResultSetMetadata types?
> >>>>>      2. I'd like to handle connections very efficiently because we
> >>>>> have a
> >>>>> use
> >>>>>      case with billions of records and thus millions of splits and
> >>>>> establish
> >>>>> a
> >>>>>      new connection each time could be expensive. Would it be a
> >>>>> problem to
> >>>>> add
> >>>>>      apache pool dependency to the jdbc batch connector in order to
> >>>>> reuase
> >>>>> the
> >>>>>      created connections?
> >>>>>
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> >>>>>
> >>>>>
> >>>>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
Following your suggestions I've fixed the connection reuse in my PR at
https://github.com/apache/flink/pull/1885.
I simply check in the establishConnection() if dbConn!=null and, in that
case, I simply return immediately.

Thus, the only remaining thin to fix is the null handling. Do you have any
suggestion about how to transform the results in a POJO?
Maybe returning a Row and then let the user manage the conversion to the
target POJO in a successive map could be a more general soloution?

Best,
Flavio

On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> wrote:

> There is an InputFormat object for each parallel task of a DataSource.
> So for a source with parallelism 8 you will have 8 instances of the
> InputFormat running, regardless whether this is on one box with 8 slots or
> 8 machines with 1 slots each.
> The same is true for all other operators (Map, Reduce, Join, etc.) and
> DataSinks.
>
> Note, a single task does not fill a slot, but a "slice" of the program (one
> parallel task of each operator) fills a slot.
>
> Cheers, Fabian
>
> 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>
> > ok thanks!just one last question: an inputformat is instantiated for each
> > task slot or once for task manger?
> > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote:
> >
> > > no.
> > >
> > > if (connection==null) {
> > >  establishCOnnection();
> > > }
> > >
> > > done. same connection for all splits.
> > >
> > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > >
> > >> I didn't understand what you mean for "it should also be possible to
> > reuse
> > >> the same connection of an InputFormat across InputSplits, i.e., calls
> of
> > >> the open() method".
> > >> At the moment in the open method there's a call to
> establishConnection,
> > >> thus, a new connection is created for each split.
> > >> If I understood correctly, you're suggesting to create a pool in the
> > >> inputFormat and simply call poo.borrow() in the open() rather than
> > >> establishConnection?
> > >>
> > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote:
> > >>
> > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > >>>
> > >>> Hi Flavio,
> > >>>>
> > >>>> that are good questions.
> > >>>>
> > >>>> 1) Replacing null values by default values and simply forwarding
> > records
> > >>>> is
> > >>>> very dangerous, in my opinion.
> > >>>> I see two alternatives: A) we use a data type that tolerates null
> > >>>> values.
> > >>>> This could be a POJO that the user has to provide or Row. The
> drawback
> > >>>> of
> > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple
> and
> > >>>> add
> > >>>> an additional field that holds an Integer which serves as a bitset
> to
> > >>>> mark
> > >>>> null fields. This would be a pretty low level API though. I am
> leaning
> > >>>> towards the user-provided POJO option.
> > >>>>
> > >>>> i would also lean towards the POJO option.
> > >>>
> > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I
> think
> > we
> > >>>> can add a dependency to that module. However, it should also be
> > possible
> > >>>> to
> > >>>> reuse the same connection of an InputFormat across InputSplits,
> i.e.,
> > >>>> calls
> > >>>> of the open() method. Wouldn't that be sufficient?
> > >>>>
> > >>>> this is the right approach imo.
> > >>>
> > >>> Best, Fabian
> > >>>>
> > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]
> >:
> > >>>>
> > >>>> Hi guys,
> > >>>>
> > >>>>> I'm integrating the comments of Chesnay to my PR but there's a
> couple
> > >>>>> of
> > >>>>> thing that I'd like to discuss with the core developers.
> > >>>>>
> > >>>>>
> > >>>>>      1. about the JDBC type mapping (addValue() method at [1]: At
> the
> > >>>>> moment
> > >>>>>      if I find a null value for a  Double, the getDouble of jdbc
> > return
> > >>>>> 0.0.
> > >>>>> Is
> > >>>>>      it really the correct behaviour? Wouldn't be better to use a
> > POJO
> > >>>>> or
> > >>>>> the
> > >>>>>      Row of datatable that can handle void? Moreover, the mapping
> > >>>>> between
> > >>>>> SQL
> > >>>>>      type and Java types varies much from the single JDBC
> > >>>>> implementation.
> > >>>>>      Wouldn't be better to rely on the Java type coming from using
> > >>>>>      resultSet.getObject() to get such a mapping rather than using
> > the
> > >>>>>      ResultSetMetadata types?
> > >>>>>      2. I'd like to handle connections very efficiently because we
> > >>>>> have a
> > >>>>> use
> > >>>>>      case with billions of records and thus millions of splits and
> > >>>>> establish
> > >>>>> a
> > >>>>>      new connection each time could be expensive. Would it be a
> > >>>>> problem to
> > >>>>> add
> > >>>>>      apache pool dependency to the jdbc batch connector in order to
> > >>>>> reuase
> > >>>>> the
> > >>>>>      created connections?
> > >>>>>
> > >>>>>
> > >>>>> [1]
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > >>>>>
> > >>>>>
> > >>>>>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Stefano Bortoli
If we share the connection, then we should also be careful with the close()
implementation. I did not see changes for this method in the PR.

saluti,
Stefano

2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> Following your suggestions I've fixed the connection reuse in my PR at
> https://github.com/apache/flink/pull/1885.
> I simply check in the establishConnection() if dbConn!=null and, in that
> case, I simply return immediately.
>
> Thus, the only remaining thin to fix is the null handling. Do you have any
> suggestion about how to transform the results in a POJO?
> Maybe returning a Row and then let the user manage the conversion to the
> target POJO in a successive map could be a more general soloution?
>
> Best,
> Flavio
>
> On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> wrote:
>
> > There is an InputFormat object for each parallel task of a DataSource.
> > So for a source with parallelism 8 you will have 8 instances of the
> > InputFormat running, regardless whether this is on one box with 8 slots
> or
> > 8 machines with 1 slots each.
> > The same is true for all other operators (Map, Reduce, Join, etc.) and
> > DataSinks.
> >
> > Note, a single task does not fill a slot, but a "slice" of the program
> (one
> > parallel task of each operator) fills a slot.
> >
> > Cheers, Fabian
> >
> > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >
> > > ok thanks!just one last question: an inputformat is instantiated for
> each
> > > task slot or once for task manger?
> > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote:
> > >
> > > > no.
> > > >
> > > > if (connection==null) {
> > > >  establishCOnnection();
> > > > }
> > > >
> > > > done. same connection for all splits.
> > > >
> > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > >
> > > >> I didn't understand what you mean for "it should also be possible to
> > > reuse
> > > >> the same connection of an InputFormat across InputSplits, i.e.,
> calls
> > of
> > > >> the open() method".
> > > >> At the moment in the open method there's a call to
> > establishConnection,
> > > >> thus, a new connection is created for each split.
> > > >> If I understood correctly, you're suggesting to create a pool in the
> > > >> inputFormat and simply call poo.borrow() in the open() rather than
> > > >> establishConnection?
> > > >>
> > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]>
> wrote:
> > > >>
> > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > >>>
> > > >>> Hi Flavio,
> > > >>>>
> > > >>>> that are good questions.
> > > >>>>
> > > >>>> 1) Replacing null values by default values and simply forwarding
> > > records
> > > >>>> is
> > > >>>> very dangerous, in my opinion.
> > > >>>> I see two alternatives: A) we use a data type that tolerates null
> > > >>>> values.
> > > >>>> This could be a POJO that the user has to provide or Row. The
> > drawback
> > > >>>> of
> > > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple
> > and
> > > >>>> add
> > > >>>> an additional field that holds an Integer which serves as a bitset
> > to
> > > >>>> mark
> > > >>>> null fields. This would be a pretty low level API though. I am
> > leaning
> > > >>>> towards the user-provided POJO option.
> > > >>>>
> > > >>>> i would also lean towards the POJO option.
> > > >>>
> > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I
> > think
> > > we
> > > >>>> can add a dependency to that module. However, it should also be
> > > possible
> > > >>>> to
> > > >>>> reuse the same connection of an InputFormat across InputSplits,
> > i.e.,
> > > >>>> calls
> > > >>>> of the open() method. Wouldn't that be sufficient?
> > > >>>>
> > > >>>> this is the right approach imo.
> > > >>>
> > > >>> Best, Fabian
> > > >>>>
> > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> [hidden email]
> > >:
> > > >>>>
> > > >>>> Hi guys,
> > > >>>>
> > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a
> > couple
> > > >>>>> of
> > > >>>>> thing that I'd like to discuss with the core developers.
> > > >>>>>
> > > >>>>>
> > > >>>>>      1. about the JDBC type mapping (addValue() method at [1]: At
> > the
> > > >>>>> moment
> > > >>>>>      if I find a null value for a  Double, the getDouble of jdbc
> > > return
> > > >>>>> 0.0.
> > > >>>>> Is
> > > >>>>>      it really the correct behaviour? Wouldn't be better to use a
> > > POJO
> > > >>>>> or
> > > >>>>> the
> > > >>>>>      Row of datatable that can handle void? Moreover, the mapping
> > > >>>>> between
> > > >>>>> SQL
> > > >>>>>      type and Java types varies much from the single JDBC
> > > >>>>> implementation.
> > > >>>>>      Wouldn't be better to rely on the Java type coming from
> using
> > > >>>>>      resultSet.getObject() to get such a mapping rather than
> using
> > > the
> > > >>>>>      ResultSetMetadata types?
> > > >>>>>      2. I'd like to handle connections very efficiently because
> we
> > > >>>>> have a
> > > >>>>> use
> > > >>>>>      case with billions of records and thus millions of splits
> and
> > > >>>>> establish
> > > >>>>> a
> > > >>>>>      new connection each time could be expensive. Would it be a
> > > >>>>> problem to
> > > >>>>> add
> > > >>>>>      apache pool dependency to the jdbc batch connector in order
> to
> > > >>>>> reuase
> > > >>>>> the
> > > >>>>>      created connections?
> > > >>>>>
> > > >>>>>
> > > >>>>> [1]
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
Talking with Stefano this morning and looking at the DataSourceTask code we
discovered that the open() and close() methods are both called for every
split and not once per inputFormat instance (maybe open and close should be
renamed as openSplit and closeSplit to avoid confusion...).
I think that it could worth to add 2 methods to the InputFormat (e.g.
openInputFormat() and closeInputFormat() ) to allow for the managment of
the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and
thus adding a dependency) to avoid the creation of a new connection
(expensive operation) for every split (that in our use case happens
millions of times).

What about the output of the inputFormat? how do you want me to proceed?
With POJO or Row? If POJO, which strategy do you suggest?

Best,
Flavio

On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]>
wrote:

> If we share the connection, then we should also be careful with the close()
> implementation. I did not see changes for this method in the PR.
>
> saluti,
> Stefano
>
> 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>
> > Following your suggestions I've fixed the connection reuse in my PR at
> > https://github.com/apache/flink/pull/1885.
> > I simply check in the establishConnection() if dbConn!=null and, in that
> > case, I simply return immediately.
> >
> > Thus, the only remaining thin to fix is the null handling. Do you have
> any
> > suggestion about how to transform the results in a POJO?
> > Maybe returning a Row and then let the user manage the conversion to the
> > target POJO in a successive map could be a more general soloution?
> >
> > Best,
> > Flavio
> >
> > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]>
> wrote:
> >
> > > There is an InputFormat object for each parallel task of a DataSource.
> > > So for a source with parallelism 8 you will have 8 instances of the
> > > InputFormat running, regardless whether this is on one box with 8 slots
> > or
> > > 8 machines with 1 slots each.
> > > The same is true for all other operators (Map, Reduce, Join, etc.) and
> > > DataSinks.
> > >
> > > Note, a single task does not fill a slot, but a "slice" of the program
> > (one
> > > parallel task of each operator) fills a slot.
> > >
> > > Cheers, Fabian
> > >
> > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> > >
> > > > ok thanks!just one last question: an inputformat is instantiated for
> > each
> > > > task slot or once for task manger?
> > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote:
> > > >
> > > > > no.
> > > > >
> > > > > if (connection==null) {
> > > > >  establishCOnnection();
> > > > > }
> > > > >
> > > > > done. same connection for all splits.
> > > > >
> > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > >
> > > > >> I didn't understand what you mean for "it should also be possible
> to
> > > > reuse
> > > > >> the same connection of an InputFormat across InputSplits, i.e.,
> > calls
> > > of
> > > > >> the open() method".
> > > > >> At the moment in the open method there's a call to
> > > establishConnection,
> > > > >> thus, a new connection is created for each split.
> > > > >> If I understood correctly, you're suggesting to create a pool in
> the
> > > > >> inputFormat and simply call poo.borrow() in the open() rather than
> > > > >> establishConnection?
> > > > >>
> > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]>
> > wrote:
> > > > >>
> > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > >>>
> > > > >>> Hi Flavio,
> > > > >>>>
> > > > >>>> that are good questions.
> > > > >>>>
> > > > >>>> 1) Replacing null values by default values and simply forwarding
> > > > records
> > > > >>>> is
> > > > >>>> very dangerous, in my opinion.
> > > > >>>> I see two alternatives: A) we use a data type that tolerates
> null
> > > > >>>> values.
> > > > >>>> This could be a POJO that the user has to provide or Row. The
> > > drawback
> > > > >>>> of
> > > > >>>> Row is that it is untyped and not easy to handle. B) We use
> Tuple
> > > and
> > > > >>>> add
> > > > >>>> an additional field that holds an Integer which serves as a
> bitset
> > > to
> > > > >>>> mark
> > > > >>>> null fields. This would be a pretty low level API though. I am
> > > leaning
> > > > >>>> towards the user-provided POJO option.
> > > > >>>>
> > > > >>>> i would also lean towards the POJO option.
> > > > >>>
> > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I
> > > think
> > > > we
> > > > >>>> can add a dependency to that module. However, it should also be
> > > > possible
> > > > >>>> to
> > > > >>>> reuse the same connection of an InputFormat across InputSplits,
> > > i.e.,
> > > > >>>> calls
> > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > >>>>
> > > > >>>> this is the right approach imo.
> > > > >>>
> > > > >>> Best, Fabian
> > > > >>>>
> > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > [hidden email]
> > > >:
> > > > >>>>
> > > > >>>> Hi guys,
> > > > >>>>
> > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a
> > > couple
> > > > >>>>> of
> > > > >>>>> thing that I'd like to discuss with the core developers.
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>      1. about the JDBC type mapping (addValue() method at [1]:
> At
> > > the
> > > > >>>>> moment
> > > > >>>>>      if I find a null value for a  Double, the getDouble of
> jdbc
> > > > return
> > > > >>>>> 0.0.
> > > > >>>>> Is
> > > > >>>>>      it really the correct behaviour? Wouldn't be better to
> use a
> > > > POJO
> > > > >>>>> or
> > > > >>>>> the
> > > > >>>>>      Row of datatable that can handle void? Moreover, the
> mapping
> > > > >>>>> between
> > > > >>>>> SQL
> > > > >>>>>      type and Java types varies much from the single JDBC
> > > > >>>>> implementation.
> > > > >>>>>      Wouldn't be better to rely on the Java type coming from
> > using
> > > > >>>>>      resultSet.getObject() to get such a mapping rather than
> > using
> > > > the
> > > > >>>>>      ResultSetMetadata types?
> > > > >>>>>      2. I'd like to handle connections very efficiently because
> > we
> > > > >>>>> have a
> > > > >>>>> use
> > > > >>>>>      case with billions of records and thus millions of splits
> > and
> > > > >>>>> establish
> > > > >>>>> a
> > > > >>>>>      new connection each time could be expensive. Would it be a
> > > > >>>>> problem to
> > > > >>>>> add
> > > > >>>>>      apache pool dependency to the jdbc batch connector in
> order
> > to
> > > > >>>>> reuase
> > > > >>>>> the
> > > > >>>>>      created connections?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> [1]
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Stefano Bortoli
Being a generic JDBC input format, I would prefer to stay with Row, letting
the developer manage the cast according to the driver functionalities.

As for the open() and close() issue, I agree with Flavio that we'd need a
better management of the inputformat lifecycle. Perhaps a new interface
extending it: RichInputFormat?

my2c.

Stefano

2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> Talking with Stefano this morning and looking at the DataSourceTask code we
> discovered that the open() and close() methods are both called for every
> split and not once per inputFormat instance (maybe open and close should be
> renamed as openSplit and closeSplit to avoid confusion...).
> I think that it could worth to add 2 methods to the InputFormat (e.g.
> openInputFormat() and closeInputFormat() ) to allow for the managment of
> the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and
> thus adding a dependency) to avoid the creation of a new connection
> (expensive operation) for every split (that in our use case happens
> millions of times).
>
> What about the output of the inputFormat? how do you want me to proceed?
> With POJO or Row? If POJO, which strategy do you suggest?
>
> Best,
> Flavio
>
> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]>
> wrote:
>
> > If we share the connection, then we should also be careful with the
> close()
> > implementation. I did not see changes for this method in the PR.
> >
> > saluti,
> > Stefano
> >
> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >
> > > Following your suggestions I've fixed the connection reuse in my PR at
> > > https://github.com/apache/flink/pull/1885.
> > > I simply check in the establishConnection() if dbConn!=null and, in
> that
> > > case, I simply return immediately.
> > >
> > > Thus, the only remaining thin to fix is the null handling. Do you have
> > any
> > > suggestion about how to transform the results in a POJO?
> > > Maybe returning a Row and then let the user manage the conversion to
> the
> > > target POJO in a successive map could be a more general soloution?
> > >
> > > Best,
> > > Flavio
> > >
> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > There is an InputFormat object for each parallel task of a
> DataSource.
> > > > So for a source with parallelism 8 you will have 8 instances of the
> > > > InputFormat running, regardless whether this is on one box with 8
> slots
> > > or
> > > > 8 machines with 1 slots each.
> > > > The same is true for all other operators (Map, Reduce, Join, etc.)
> and
> > > > DataSinks.
> > > >
> > > > Note, a single task does not fill a slot, but a "slice" of the
> program
> > > (one
> > > > parallel task of each operator) fills a slot.
> > > >
> > > > Cheers, Fabian
> > > >
> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]
> >:
> > > >
> > > > > ok thanks!just one last question: an inputformat is instantiated
> for
> > > each
> > > > > task slot or once for task manger?
> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]>
> wrote:
> > > > >
> > > > > > no.
> > > > > >
> > > > > > if (connection==null) {
> > > > > >  establishCOnnection();
> > > > > > }
> > > > > >
> > > > > > done. same connection for all splits.
> > > > > >
> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > > >
> > > > > >> I didn't understand what you mean for "it should also be
> possible
> > to
> > > > > reuse
> > > > > >> the same connection of an InputFormat across InputSplits, i.e.,
> > > calls
> > > > of
> > > > > >> the open() method".
> > > > > >> At the moment in the open method there's a call to
> > > > establishConnection,
> > > > > >> thus, a new connection is created for each split.
> > > > > >> If I understood correctly, you're suggesting to create a pool in
> > the
> > > > > >> inputFormat and simply call poo.borrow() in the open() rather
> than
> > > > > >> establishConnection?
> > > > > >>
> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]>
> > > wrote:
> > > > > >>
> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > > >>>
> > > > > >>> Hi Flavio,
> > > > > >>>>
> > > > > >>>> that are good questions.
> > > > > >>>>
> > > > > >>>> 1) Replacing null values by default values and simply
> forwarding
> > > > > records
> > > > > >>>> is
> > > > > >>>> very dangerous, in my opinion.
> > > > > >>>> I see two alternatives: A) we use a data type that tolerates
> > null
> > > > > >>>> values.
> > > > > >>>> This could be a POJO that the user has to provide or Row. The
> > > > drawback
> > > > > >>>> of
> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use
> > Tuple
> > > > and
> > > > > >>>> add
> > > > > >>>> an additional field that holds an Integer which serves as a
> > bitset
> > > > to
> > > > > >>>> mark
> > > > > >>>> null fields. This would be a pretty low level API though. I am
> > > > leaning
> > > > > >>>> towards the user-provided POJO option.
> > > > > >>>>
> > > > > >>>> i would also lean towards the POJO option.
> > > > > >>>
> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module.
> I
> > > > think
> > > > > we
> > > > > >>>> can add a dependency to that module. However, it should also
> be
> > > > > possible
> > > > > >>>> to
> > > > > >>>> reuse the same connection of an InputFormat across
> InputSplits,
> > > > i.e.,
> > > > > >>>> calls
> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > > >>>>
> > > > > >>>> this is the right approach imo.
> > > > > >>>
> > > > > >>> Best, Fabian
> > > > > >>>>
> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > > [hidden email]
> > > > >:
> > > > > >>>>
> > > > > >>>> Hi guys,
> > > > > >>>>
> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's
> a
> > > > couple
> > > > > >>>>> of
> > > > > >>>>> thing that I'd like to discuss with the core developers.
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>      1. about the JDBC type mapping (addValue() method at
> [1]:
> > At
> > > > the
> > > > > >>>>> moment
> > > > > >>>>>      if I find a null value for a  Double, the getDouble of
> > jdbc
> > > > > return
> > > > > >>>>> 0.0.
> > > > > >>>>> Is
> > > > > >>>>>      it really the correct behaviour? Wouldn't be better to
> > use a
> > > > > POJO
> > > > > >>>>> or
> > > > > >>>>> the
> > > > > >>>>>      Row of datatable that can handle void? Moreover, the
> > mapping
> > > > > >>>>> between
> > > > > >>>>> SQL
> > > > > >>>>>      type and Java types varies much from the single JDBC
> > > > > >>>>> implementation.
> > > > > >>>>>      Wouldn't be better to rely on the Java type coming from
> > > using
> > > > > >>>>>      resultSet.getObject() to get such a mapping rather than
> > > using
> > > > > the
> > > > > >>>>>      ResultSetMetadata types?
> > > > > >>>>>      2. I'd like to handle connections very efficiently
> because
> > > we
> > > > > >>>>> have a
> > > > > >>>>> use
> > > > > >>>>>      case with billions of records and thus millions of
> splits
> > > and
> > > > > >>>>> establish
> > > > > >>>>> a
> > > > > >>>>>      new connection each time could be expensive. Would it
> be a
> > > > > >>>>> problem to
> > > > > >>>>> add
> > > > > >>>>>      apache pool dependency to the jdbc batch connector in
> > order
> > > to
> > > > > >>>>> reuase
> > > > > >>>>> the
> > > > > >>>>>      created connections?
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> [1]
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > >
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Stefano Bortoli
Of course there is one already. We'll look into the runtime context.

saluti,
Stefano

2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:

> Being a generic JDBC input format, I would prefer to stay with Row,
> letting the developer manage the cast according to the driver
> functionalities.
>
> As for the open() and close() issue, I agree with Flavio that we'd need a
> better management of the inputformat lifecycle. Perhaps a new interface
> extending it: RichInputFormat?
>
> my2c.
>
> Stefano
>
> 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>
>> Talking with Stefano this morning and looking at the DataSourceTask code
>> we
>> discovered that the open() and close() methods are both called for every
>> split and not once per inputFormat instance (maybe open and close should
>> be
>> renamed as openSplit and closeSplit to avoid confusion...).
>> I think that it could worth to add 2 methods to the InputFormat (e.g.
>> openInputFormat() and closeInputFormat() ) to allow for the managment of
>> the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and
>> thus adding a dependency) to avoid the creation of a new connection
>> (expensive operation) for every split (that in our use case happens
>> millions of times).
>>
>> What about the output of the inputFormat? how do you want me to proceed?
>> With POJO or Row? If POJO, which strategy do you suggest?
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]>
>> wrote:
>>
>> > If we share the connection, then we should also be careful with the
>> close()
>> > implementation. I did not see changes for this method in the PR.
>> >
>> > saluti,
>> > Stefano
>> >
>> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>> >
>> > > Following your suggestions I've fixed the connection reuse in my PR at
>> > > https://github.com/apache/flink/pull/1885.
>> > > I simply check in the establishConnection() if dbConn!=null and, in
>> that
>> > > case, I simply return immediately.
>> > >
>> > > Thus, the only remaining thin to fix is the null handling. Do you have
>> > any
>> > > suggestion about how to transform the results in a POJO?
>> > > Maybe returning a Row and then let the user manage the conversion to
>> the
>> > > target POJO in a successive map could be a more general soloution?
>> > >
>> > > Best,
>> > > Flavio
>> > >
>> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]>
>> > wrote:
>> > >
>> > > > There is an InputFormat object for each parallel task of a
>> DataSource.
>> > > > So for a source with parallelism 8 you will have 8 instances of the
>> > > > InputFormat running, regardless whether this is on one box with 8
>> slots
>> > > or
>> > > > 8 machines with 1 slots each.
>> > > > The same is true for all other operators (Map, Reduce, Join, etc.)
>> and
>> > > > DataSinks.
>> > > >
>> > > > Note, a single task does not fill a slot, but a "slice" of the
>> program
>> > > (one
>> > > > parallel task of each operator) fills a slot.
>> > > >
>> > > > Cheers, Fabian
>> > > >
>> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]
>> >:
>> > > >
>> > > > > ok thanks!just one last question: an inputformat is instantiated
>> for
>> > > each
>> > > > > task slot or once for task manger?
>> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]>
>> wrote:
>> > > > >
>> > > > > > no.
>> > > > > >
>> > > > > > if (connection==null) {
>> > > > > >  establishCOnnection();
>> > > > > > }
>> > > > > >
>> > > > > > done. same connection for all splits.
>> > > > > >
>> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
>> > > > > >
>> > > > > >> I didn't understand what you mean for "it should also be
>> possible
>> > to
>> > > > > reuse
>> > > > > >> the same connection of an InputFormat across InputSplits, i.e.,
>> > > calls
>> > > > of
>> > > > > >> the open() method".
>> > > > > >> At the moment in the open method there's a call to
>> > > > establishConnection,
>> > > > > >> thus, a new connection is created for each split.
>> > > > > >> If I understood correctly, you're suggesting to create a pool
>> in
>> > the
>> > > > > >> inputFormat and simply call poo.borrow() in the open() rather
>> than
>> > > > > >> establishConnection?
>> > > > > >>
>> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]>
>> > > wrote:
>> > > > > >>
>> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
>> > > > > >>>
>> > > > > >>> Hi Flavio,
>> > > > > >>>>
>> > > > > >>>> that are good questions.
>> > > > > >>>>
>> > > > > >>>> 1) Replacing null values by default values and simply
>> forwarding
>> > > > > records
>> > > > > >>>> is
>> > > > > >>>> very dangerous, in my opinion.
>> > > > > >>>> I see two alternatives: A) we use a data type that tolerates
>> > null
>> > > > > >>>> values.
>> > > > > >>>> This could be a POJO that the user has to provide or Row. The
>> > > > drawback
>> > > > > >>>> of
>> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use
>> > Tuple
>> > > > and
>> > > > > >>>> add
>> > > > > >>>> an additional field that holds an Integer which serves as a
>> > bitset
>> > > > to
>> > > > > >>>> mark
>> > > > > >>>> null fields. This would be a pretty low level API though. I
>> am
>> > > > leaning
>> > > > > >>>> towards the user-provided POJO option.
>> > > > > >>>>
>> > > > > >>>> i would also lean towards the POJO option.
>> > > > > >>>
>> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
>> module. I
>> > > > think
>> > > > > we
>> > > > > >>>> can add a dependency to that module. However, it should also
>> be
>> > > > > possible
>> > > > > >>>> to
>> > > > > >>>> reuse the same connection of an InputFormat across
>> InputSplits,
>> > > > i.e.,
>> > > > > >>>> calls
>> > > > > >>>> of the open() method. Wouldn't that be sufficient?
>> > > > > >>>>
>> > > > > >>>> this is the right approach imo.
>> > > > > >>>
>> > > > > >>> Best, Fabian
>> > > > > >>>>
>> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
>> > > [hidden email]
>> > > > >:
>> > > > > >>>>
>> > > > > >>>> Hi guys,
>> > > > > >>>>
>> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
>> there's a
>> > > > couple
>> > > > > >>>>> of
>> > > > > >>>>> thing that I'd like to discuss with the core developers.
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>      1. about the JDBC type mapping (addValue() method at
>> [1]:
>> > At
>> > > > the
>> > > > > >>>>> moment
>> > > > > >>>>>      if I find a null value for a  Double, the getDouble of
>> > jdbc
>> > > > > return
>> > > > > >>>>> 0.0.
>> > > > > >>>>> Is
>> > > > > >>>>>      it really the correct behaviour? Wouldn't be better to
>> > use a
>> > > > > POJO
>> > > > > >>>>> or
>> > > > > >>>>> the
>> > > > > >>>>>      Row of datatable that can handle void? Moreover, the
>> > mapping
>> > > > > >>>>> between
>> > > > > >>>>> SQL
>> > > > > >>>>>      type and Java types varies much from the single JDBC
>> > > > > >>>>> implementation.
>> > > > > >>>>>      Wouldn't be better to rely on the Java type coming from
>> > > using
>> > > > > >>>>>      resultSet.getObject() to get such a mapping rather than
>> > > using
>> > > > > the
>> > > > > >>>>>      ResultSetMetadata types?
>> > > > > >>>>>      2. I'd like to handle connections very efficiently
>> because
>> > > we
>> > > > > >>>>> have a
>> > > > > >>>>> use
>> > > > > >>>>>      case with billions of records and thus millions of
>> splits
>> > > and
>> > > > > >>>>> establish
>> > > > > >>>>> a
>> > > > > >>>>>      new connection each time could be expensive. Would it
>> be a
>> > > > > >>>>> problem to
>> > > > > >>>>> add
>> > > > > >>>>>      apache pool dependency to the jdbc batch connector in
>> > order
>> > > to
>> > > > > >>>>> reuase
>> > > > > >>>>> the
>> > > > > >>>>>      created connections?
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>> [1]
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Aljoscha Krettek-2
There is also InputFormat.configure() which is called before any split
processing happens. But I see your point about a missing close() method
that is called after all input splits have been processed.

On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> wrote:

> Of course there is one already. We'll look into the runtime context.
>
> saluti,
> Stefano
>
> 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:
>
> > Being a generic JDBC input format, I would prefer to stay with Row,
> > letting the developer manage the cast according to the driver
> > functionalities.
> >
> > As for the open() and close() issue, I agree with Flavio that we'd need a
> > better management of the inputformat lifecycle. Perhaps a new interface
> > extending it: RichInputFormat?
> >
> > my2c.
> >
> > Stefano
> >
> > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >
> >> Talking with Stefano this morning and looking at the DataSourceTask code
> >> we
> >> discovered that the open() and close() methods are both called for every
> >> split and not once per inputFormat instance (maybe open and close should
> >> be
> >> renamed as openSplit and closeSplit to avoid confusion...).
> >> I think that it could worth to add 2 methods to the InputFormat (e.g.
> >> openInputFormat() and closeInputFormat() ) to allow for the managment of
> >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool
> (and
> >> thus adding a dependency) to avoid the creation of a new connection
> >> (expensive operation) for every split (that in our use case happens
> >> millions of times).
> >>
> >> What about the output of the inputFormat? how do you want me to proceed?
> >> With POJO or Row? If POJO, which strategy do you suggest?
> >>
> >> Best,
> >> Flavio
> >>
> >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]>
> >> wrote:
> >>
> >> > If we share the connection, then we should also be careful with the
> >> close()
> >> > implementation. I did not see changes for this method in the PR.
> >> >
> >> > saluti,
> >> > Stefano
> >> >
> >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >> >
> >> > > Following your suggestions I've fixed the connection reuse in my PR
> at
> >> > > https://github.com/apache/flink/pull/1885.
> >> > > I simply check in the establishConnection() if dbConn!=null and, in
> >> that
> >> > > case, I simply return immediately.
> >> > >
> >> > > Thus, the only remaining thin to fix is the null handling. Do you
> have
> >> > any
> >> > > suggestion about how to transform the results in a POJO?
> >> > > Maybe returning a Row and then let the user manage the conversion to
> >> the
> >> > > target POJO in a successive map could be a more general soloution?
> >> > >
> >> > > Best,
> >> > > Flavio
> >> > >
> >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]>
> >> > wrote:
> >> > >
> >> > > > There is an InputFormat object for each parallel task of a
> >> DataSource.
> >> > > > So for a source with parallelism 8 you will have 8 instances of
> the
> >> > > > InputFormat running, regardless whether this is on one box with 8
> >> slots
> >> > > or
> >> > > > 8 machines with 1 slots each.
> >> > > > The same is true for all other operators (Map, Reduce, Join, etc.)
> >> and
> >> > > > DataSinks.
> >> > > >
> >> > > > Note, a single task does not fill a slot, but a "slice" of the
> >> program
> >> > > (one
> >> > > > parallel task of each operator) fills a slot.
> >> > > >
> >> > > > Cheers, Fabian
> >> > > >
> >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> [hidden email]
> >> >:
> >> > > >
> >> > > > > ok thanks!just one last question: an inputformat is instantiated
> >> for
> >> > > each
> >> > > > > task slot or once for task manger?
> >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]>
> >> wrote:
> >> > > > >
> >> > > > > > no.
> >> > > > > >
> >> > > > > > if (connection==null) {
> >> > > > > >  establishCOnnection();
> >> > > > > > }
> >> > > > > >
> >> > > > > > done. same connection for all splits.
> >> > > > > >
> >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> >> > > > > >
> >> > > > > >> I didn't understand what you mean for "it should also be
> >> possible
> >> > to
> >> > > > > reuse
> >> > > > > >> the same connection of an InputFormat across InputSplits,
> i.e.,
> >> > > calls
> >> > > > of
> >> > > > > >> the open() method".
> >> > > > > >> At the moment in the open method there's a call to
> >> > > > establishConnection,
> >> > > > > >> thus, a new connection is created for each split.
> >> > > > > >> If I understood correctly, you're suggesting to create a pool
> >> in
> >> > the
> >> > > > > >> inputFormat and simply call poo.borrow() in the open() rather
> >> than
> >> > > > > >> establishConnection?
> >> > > > > >>
> >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]
> >
> >> > > wrote:
> >> > > > > >>
> >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> >> > > > > >>>
> >> > > > > >>> Hi Flavio,
> >> > > > > >>>>
> >> > > > > >>>> that are good questions.
> >> > > > > >>>>
> >> > > > > >>>> 1) Replacing null values by default values and simply
> >> forwarding
> >> > > > > records
> >> > > > > >>>> is
> >> > > > > >>>> very dangerous, in my opinion.
> >> > > > > >>>> I see two alternatives: A) we use a data type that
> tolerates
> >> > null
> >> > > > > >>>> values.
> >> > > > > >>>> This could be a POJO that the user has to provide or Row.
> The
> >> > > > drawback
> >> > > > > >>>> of
> >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use
> >> > Tuple
> >> > > > and
> >> > > > > >>>> add
> >> > > > > >>>> an additional field that holds an Integer which serves as a
> >> > bitset
> >> > > > to
> >> > > > > >>>> mark
> >> > > > > >>>> null fields. This would be a pretty low level API though. I
> >> am
> >> > > > leaning
> >> > > > > >>>> towards the user-provided POJO option.
> >> > > > > >>>>
> >> > > > > >>>> i would also lean towards the POJO option.
> >> > > > > >>>
> >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
> >> module. I
> >> > > > think
> >> > > > > we
> >> > > > > >>>> can add a dependency to that module. However, it should
> also
> >> be
> >> > > > > possible
> >> > > > > >>>> to
> >> > > > > >>>> reuse the same connection of an InputFormat across
> >> InputSplits,
> >> > > > i.e.,
> >> > > > > >>>> calls
> >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> >> > > > > >>>>
> >> > > > > >>>> this is the right approach imo.
> >> > > > > >>>
> >> > > > > >>> Best, Fabian
> >> > > > > >>>>
> >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> >> > > [hidden email]
> >> > > > >:
> >> > > > > >>>>
> >> > > > > >>>> Hi guys,
> >> > > > > >>>>
> >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
> >> there's a
> >> > > > couple
> >> > > > > >>>>> of
> >> > > > > >>>>> thing that I'd like to discuss with the core developers.
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >>>>>      1. about the JDBC type mapping (addValue() method at
> >> [1]:
> >> > At
> >> > > > the
> >> > > > > >>>>> moment
> >> > > > > >>>>>      if I find a null value for a  Double, the getDouble
> of
> >> > jdbc
> >> > > > > return
> >> > > > > >>>>> 0.0.
> >> > > > > >>>>> Is
> >> > > > > >>>>>      it really the correct behaviour? Wouldn't be better
> to
> >> > use a
> >> > > > > POJO
> >> > > > > >>>>> or
> >> > > > > >>>>> the
> >> > > > > >>>>>      Row of datatable that can handle void? Moreover, the
> >> > mapping
> >> > > > > >>>>> between
> >> > > > > >>>>> SQL
> >> > > > > >>>>>      type and Java types varies much from the single JDBC
> >> > > > > >>>>> implementation.
> >> > > > > >>>>>      Wouldn't be better to rely on the Java type coming
> from
> >> > > using
> >> > > > > >>>>>      resultSet.getObject() to get such a mapping rather
> than
> >> > > using
> >> > > > > the
> >> > > > > >>>>>      ResultSetMetadata types?
> >> > > > > >>>>>      2. I'd like to handle connections very efficiently
> >> because
> >> > > we
> >> > > > > >>>>> have a
> >> > > > > >>>>> use
> >> > > > > >>>>>      case with billions of records and thus millions of
> >> splits
> >> > > and
> >> > > > > >>>>> establish
> >> > > > > >>>>> a
> >> > > > > >>>>>      new connection each time could be expensive. Would it
> >> be a
> >> > > > > >>>>> problem to
> >> > > > > >>>>> add
> >> > > > > >>>>>      apache pool dependency to the jdbc batch connector in
> >> > order
> >> > > to
> >> > > > > >>>>> reuase
> >> > > > > >>>>> the
> >> > > > > >>>>>      created connections?
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >>>>> [1]
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >>>>>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
Yes, I forgot to mention that I could instantiate the connection in the
configure() but then I can't close it (as you confirmed) :(

On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]>
wrote:

> There is also InputFormat.configure() which is called before any split
> processing happens. But I see your point about a missing close() method
> that is called after all input splits have been processed.
>
> On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> wrote:
>
> > Of course there is one already. We'll look into the runtime context.
> >
> > saluti,
> > Stefano
> >
> > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:
> >
> > > Being a generic JDBC input format, I would prefer to stay with Row,
> > > letting the developer manage the cast according to the driver
> > > functionalities.
> > >
> > > As for the open() and close() issue, I agree with Flavio that we'd
> need a
> > > better management of the inputformat lifecycle. Perhaps a new interface
> > > extending it: RichInputFormat?
> > >
> > > my2c.
> > >
> > > Stefano
> > >
> > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> > >
> > >> Talking with Stefano this morning and looking at the DataSourceTask
> code
> > >> we
> > >> discovered that the open() and close() methods are both called for
> every
> > >> split and not once per inputFormat instance (maybe open and close
> should
> > >> be
> > >> renamed as openSplit and closeSplit to avoid confusion...).
> > >> I think that it could worth to add 2 methods to the InputFormat (e.g.
> > >> openInputFormat() and closeInputFormat() ) to allow for the managment
> of
> > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool
> > (and
> > >> thus adding a dependency) to avoid the creation of a new connection
> > >> (expensive operation) for every split (that in our use case happens
> > >> millions of times).
> > >>
> > >> What about the output of the inputFormat? how do you want me to
> proceed?
> > >> With POJO or Row? If POJO, which strategy do you suggest?
> > >>
> > >> Best,
> > >> Flavio
> > >>
> > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]
> >
> > >> wrote:
> > >>
> > >> > If we share the connection, then we should also be careful with the
> > >> close()
> > >> > implementation. I did not see changes for this method in the PR.
> > >> >
> > >> > saluti,
> > >> > Stefano
> > >> >
> > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]
> >:
> > >> >
> > >> > > Following your suggestions I've fixed the connection reuse in my
> PR
> > at
> > >> > > https://github.com/apache/flink/pull/1885.
> > >> > > I simply check in the establishConnection() if dbConn!=null and,
> in
> > >> that
> > >> > > case, I simply return immediately.
> > >> > >
> > >> > > Thus, the only remaining thin to fix is the null handling. Do you
> > have
> > >> > any
> > >> > > suggestion about how to transform the results in a POJO?
> > >> > > Maybe returning a Row and then let the user manage the conversion
> to
> > >> the
> > >> > > target POJO in a successive map could be a more general soloution?
> > >> > >
> > >> > > Best,
> > >> > > Flavio
> > >> > >
> > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]
> >
> > >> > wrote:
> > >> > >
> > >> > > > There is an InputFormat object for each parallel task of a
> > >> DataSource.
> > >> > > > So for a source with parallelism 8 you will have 8 instances of
> > the
> > >> > > > InputFormat running, regardless whether this is on one box with
> 8
> > >> slots
> > >> > > or
> > >> > > > 8 machines with 1 slots each.
> > >> > > > The same is true for all other operators (Map, Reduce, Join,
> etc.)
> > >> and
> > >> > > > DataSinks.
> > >> > > >
> > >> > > > Note, a single task does not fill a slot, but a "slice" of the
> > >> program
> > >> > > (one
> > >> > > > parallel task of each operator) fills a slot.
> > >> > > >
> > >> > > > Cheers, Fabian
> > >> > > >
> > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > [hidden email]
> > >> >:
> > >> > > >
> > >> > > > > ok thanks!just one last question: an inputformat is
> instantiated
> > >> for
> > >> > > each
> > >> > > > > task slot or once for task manger?
> > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]>
> > >> wrote:
> > >> > > > >
> > >> > > > > > no.
> > >> > > > > >
> > >> > > > > > if (connection==null) {
> > >> > > > > >  establishCOnnection();
> > >> > > > > > }
> > >> > > > > >
> > >> > > > > > done. same connection for all splits.
> > >> > > > > >
> > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > >> > > > > >
> > >> > > > > >> I didn't understand what you mean for "it should also be
> > >> possible
> > >> > to
> > >> > > > > reuse
> > >> > > > > >> the same connection of an InputFormat across InputSplits,
> > i.e.,
> > >> > > calls
> > >> > > > of
> > >> > > > > >> the open() method".
> > >> > > > > >> At the moment in the open method there's a call to
> > >> > > > establishConnection,
> > >> > > > > >> thus, a new connection is created for each split.
> > >> > > > > >> If I understood correctly, you're suggesting to create a
> pool
> > >> in
> > >> > the
> > >> > > > > >> inputFormat and simply call poo.borrow() in the open()
> rather
> > >> than
> > >> > > > > >> establishConnection?
> > >> > > > > >>
> > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <
> [hidden email]
> > >
> > >> > > wrote:
> > >> > > > > >>
> > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > >> > > > > >>>
> > >> > > > > >>> Hi Flavio,
> > >> > > > > >>>>
> > >> > > > > >>>> that are good questions.
> > >> > > > > >>>>
> > >> > > > > >>>> 1) Replacing null values by default values and simply
> > >> forwarding
> > >> > > > > records
> > >> > > > > >>>> is
> > >> > > > > >>>> very dangerous, in my opinion.
> > >> > > > > >>>> I see two alternatives: A) we use a data type that
> > tolerates
> > >> > null
> > >> > > > > >>>> values.
> > >> > > > > >>>> This could be a POJO that the user has to provide or Row.
> > The
> > >> > > > drawback
> > >> > > > > >>>> of
> > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We
> use
> > >> > Tuple
> > >> > > > and
> > >> > > > > >>>> add
> > >> > > > > >>>> an additional field that holds an Integer which serves
> as a
> > >> > bitset
> > >> > > > to
> > >> > > > > >>>> mark
> > >> > > > > >>>> null fields. This would be a pretty low level API
> though. I
> > >> am
> > >> > > > leaning
> > >> > > > > >>>> towards the user-provided POJO option.
> > >> > > > > >>>>
> > >> > > > > >>>> i would also lean towards the POJO option.
> > >> > > > > >>>
> > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
> > >> module. I
> > >> > > > think
> > >> > > > > we
> > >> > > > > >>>> can add a dependency to that module. However, it should
> > also
> > >> be
> > >> > > > > possible
> > >> > > > > >>>> to
> > >> > > > > >>>> reuse the same connection of an InputFormat across
> > >> InputSplits,
> > >> > > > i.e.,
> > >> > > > > >>>> calls
> > >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > >> > > > > >>>>
> > >> > > > > >>>> this is the right approach imo.
> > >> > > > > >>>
> > >> > > > > >>> Best, Fabian
> > >> > > > > >>>>
> > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > >> > > [hidden email]
> > >> > > > >:
> > >> > > > > >>>>
> > >> > > > > >>>> Hi guys,
> > >> > > > > >>>>
> > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
> > >> there's a
> > >> > > > couple
> > >> > > > > >>>>> of
> > >> > > > > >>>>> thing that I'd like to discuss with the core developers.
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >>>>>      1. about the JDBC type mapping (addValue() method
> at
> > >> [1]:
> > >> > At
> > >> > > > the
> > >> > > > > >>>>> moment
> > >> > > > > >>>>>      if I find a null value for a  Double, the getDouble
> > of
> > >> > jdbc
> > >> > > > > return
> > >> > > > > >>>>> 0.0.
> > >> > > > > >>>>> Is
> > >> > > > > >>>>>      it really the correct behaviour? Wouldn't be better
> > to
> > >> > use a
> > >> > > > > POJO
> > >> > > > > >>>>> or
> > >> > > > > >>>>> the
> > >> > > > > >>>>>      Row of datatable that can handle void? Moreover,
> the
> > >> > mapping
> > >> > > > > >>>>> between
> > >> > > > > >>>>> SQL
> > >> > > > > >>>>>      type and Java types varies much from the single
> JDBC
> > >> > > > > >>>>> implementation.
> > >> > > > > >>>>>      Wouldn't be better to rely on the Java type coming
> > from
> > >> > > using
> > >> > > > > >>>>>      resultSet.getObject() to get such a mapping rather
> > than
> > >> > > using
> > >> > > > > the
> > >> > > > > >>>>>      ResultSetMetadata types?
> > >> > > > > >>>>>      2. I'd like to handle connections very efficiently
> > >> because
> > >> > > we
> > >> > > > > >>>>> have a
> > >> > > > > >>>>> use
> > >> > > > > >>>>>      case with billions of records and thus millions of
> > >> splits
> > >> > > and
> > >> > > > > >>>>> establish
> > >> > > > > >>>>> a
> > >> > > > > >>>>>      new connection each time could be expensive. Would
> it
> > >> be a
> > >> > > > > >>>>> problem to
> > >> > > > > >>>>> add
> > >> > > > > >>>>>      apache pool dependency to the jdbc batch connector
> in
> > >> > order
> > >> > > to
> > >> > > > > >>>>> reuase
> > >> > > > > >>>>> the
> > >> > > > > >>>>>      created connections?
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >>>>> [1]
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >>>>>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Fabian Hueske-2
I agree, a method to close an input format is missing.
InputFormat is an API stable interface, so it is not possible to extend it
(until Flink 2.0). RichInputFormat is API stable as well, but an abstract
class. So it should be possible to add an empty default implementation of a
closeInputFormat() method there.

Of course it would be good to re-use connections across input splits.
On the other hand, input splits should not be too fine-grained as well,
because input split assignment has some overhead as well.

Best, Fabian

2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> Yes, I forgot to mention that I could instantiate the connection in the
> configure() but then I can't close it (as you confirmed) :(
>
> On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > There is also InputFormat.configure() which is called before any split
> > processing happens. But I see your point about a missing close() method
> > that is called after all input splits have been processed.
> >
> > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]>
> wrote:
> >
> > > Of course there is one already. We'll look into the runtime context.
> > >
> > > saluti,
> > > Stefano
> > >
> > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:
> > >
> > > > Being a generic JDBC input format, I would prefer to stay with Row,
> > > > letting the developer manage the cast according to the driver
> > > > functionalities.
> > > >
> > > > As for the open() and close() issue, I agree with Flavio that we'd
> > need a
> > > > better management of the inputformat lifecycle. Perhaps a new
> interface
> > > > extending it: RichInputFormat?
> > > >
> > > > my2c.
> > > >
> > > > Stefano
> > > >
> > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> > > >
> > > >> Talking with Stefano this morning and looking at the DataSourceTask
> > code
> > > >> we
> > > >> discovered that the open() and close() methods are both called for
> > every
> > > >> split and not once per inputFormat instance (maybe open and close
> > should
> > > >> be
> > > >> renamed as openSplit and closeSplit to avoid confusion...).
> > > >> I think that it could worth to add 2 methods to the InputFormat
> (e.g.
> > > >> openInputFormat() and closeInputFormat() ) to allow for the
> managment
> > of
> > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool
> > > (and
> > > >> thus adding a dependency) to avoid the creation of a new connection
> > > >> (expensive operation) for every split (that in our use case happens
> > > >> millions of times).
> > > >>
> > > >> What about the output of the inputFormat? how do you want me to
> > proceed?
> > > >> With POJO or Row? If POJO, which strategy do you suggest?
> > > >>
> > > >> Best,
> > > >> Flavio
> > > >>
> > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <
> [hidden email]
> > >
> > > >> wrote:
> > > >>
> > > >> > If we share the connection, then we should also be careful with
> the
> > > >> close()
> > > >> > implementation. I did not see changes for this method in the PR.
> > > >> >
> > > >> > saluti,
> > > >> > Stefano
> > > >> >
> > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <
> [hidden email]
> > >:
> > > >> >
> > > >> > > Following your suggestions I've fixed the connection reuse in my
> > PR
> > > at
> > > >> > > https://github.com/apache/flink/pull/1885.
> > > >> > > I simply check in the establishConnection() if dbConn!=null and,
> > in
> > > >> that
> > > >> > > case, I simply return immediately.
> > > >> > >
> > > >> > > Thus, the only remaining thin to fix is the null handling. Do
> you
> > > have
> > > >> > any
> > > >> > > suggestion about how to transform the results in a POJO?
> > > >> > > Maybe returning a Row and then let the user manage the
> conversion
> > to
> > > >> the
> > > >> > > target POJO in a successive map could be a more general
> soloution?
> > > >> > >
> > > >> > > Best,
> > > >> > > Flavio
> > > >> > >
> > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <
> [hidden email]
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > There is an InputFormat object for each parallel task of a
> > > >> DataSource.
> > > >> > > > So for a source with parallelism 8 you will have 8 instances
> of
> > > the
> > > >> > > > InputFormat running, regardless whether this is on one box
> with
> > 8
> > > >> slots
> > > >> > > or
> > > >> > > > 8 machines with 1 slots each.
> > > >> > > > The same is true for all other operators (Map, Reduce, Join,
> > etc.)
> > > >> and
> > > >> > > > DataSinks.
> > > >> > > >
> > > >> > > > Note, a single task does not fill a slot, but a "slice" of the
> > > >> program
> > > >> > > (one
> > > >> > > > parallel task of each operator) fills a slot.
> > > >> > > >
> > > >> > > > Cheers, Fabian
> > > >> > > >
> > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > > [hidden email]
> > > >> >:
> > > >> > > >
> > > >> > > > > ok thanks!just one last question: an inputformat is
> > instantiated
> > > >> for
> > > >> > > each
> > > >> > > > > task slot or once for task manger?
> > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <
> [hidden email]>
> > > >> wrote:
> > > >> > > > >
> > > >> > > > > > no.
> > > >> > > > > >
> > > >> > > > > > if (connection==null) {
> > > >> > > > > >  establishCOnnection();
> > > >> > > > > > }
> > > >> > > > > >
> > > >> > > > > > done. same connection for all splits.
> > > >> > > > > >
> > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > >> > > > > >
> > > >> > > > > >> I didn't understand what you mean for "it should also be
> > > >> possible
> > > >> > to
> > > >> > > > > reuse
> > > >> > > > > >> the same connection of an InputFormat across InputSplits,
> > > i.e.,
> > > >> > > calls
> > > >> > > > of
> > > >> > > > > >> the open() method".
> > > >> > > > > >> At the moment in the open method there's a call to
> > > >> > > > establishConnection,
> > > >> > > > > >> thus, a new connection is created for each split.
> > > >> > > > > >> If I understood correctly, you're suggesting to create a
> > pool
> > > >> in
> > > >> > the
> > > >> > > > > >> inputFormat and simply call poo.borrow() in the open()
> > rather
> > > >> than
> > > >> > > > > >> establishConnection?
> > > >> > > > > >>
> > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <
> > [hidden email]
> > > >
> > > >> > > wrote:
> > > >> > > > > >>
> > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > >> > > > > >>>
> > > >> > > > > >>> Hi Flavio,
> > > >> > > > > >>>>
> > > >> > > > > >>>> that are good questions.
> > > >> > > > > >>>>
> > > >> > > > > >>>> 1) Replacing null values by default values and simply
> > > >> forwarding
> > > >> > > > > records
> > > >> > > > > >>>> is
> > > >> > > > > >>>> very dangerous, in my opinion.
> > > >> > > > > >>>> I see two alternatives: A) we use a data type that
> > > tolerates
> > > >> > null
> > > >> > > > > >>>> values.
> > > >> > > > > >>>> This could be a POJO that the user has to provide or
> Row.
> > > The
> > > >> > > > drawback
> > > >> > > > > >>>> of
> > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We
> > use
> > > >> > Tuple
> > > >> > > > and
> > > >> > > > > >>>> add
> > > >> > > > > >>>> an additional field that holds an Integer which serves
> > as a
> > > >> > bitset
> > > >> > > > to
> > > >> > > > > >>>> mark
> > > >> > > > > >>>> null fields. This would be a pretty low level API
> > though. I
> > > >> am
> > > >> > > > leaning
> > > >> > > > > >>>> towards the user-provided POJO option.
> > > >> > > > > >>>>
> > > >> > > > > >>>> i would also lean towards the POJO option.
> > > >> > > > > >>>
> > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
> > > >> module. I
> > > >> > > > think
> > > >> > > > > we
> > > >> > > > > >>>> can add a dependency to that module. However, it should
> > > also
> > > >> be
> > > >> > > > > possible
> > > >> > > > > >>>> to
> > > >> > > > > >>>> reuse the same connection of an InputFormat across
> > > >> InputSplits,
> > > >> > > > i.e.,
> > > >> > > > > >>>> calls
> > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > >> > > > > >>>>
> > > >> > > > > >>>> this is the right approach imo.
> > > >> > > > > >>>
> > > >> > > > > >>> Best, Fabian
> > > >> > > > > >>>>
> > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > > >> > > [hidden email]
> > > >> > > > >:
> > > >> > > > > >>>>
> > > >> > > > > >>>> Hi guys,
> > > >> > > > > >>>>
> > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
> > > >> there's a
> > > >> > > > couple
> > > >> > > > > >>>>> of
> > > >> > > > > >>>>> thing that I'd like to discuss with the core
> developers.
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>      1. about the JDBC type mapping (addValue() method
> > at
> > > >> [1]:
> > > >> > At
> > > >> > > > the
> > > >> > > > > >>>>> moment
> > > >> > > > > >>>>>      if I find a null value for a  Double, the
> getDouble
> > > of
> > > >> > jdbc
> > > >> > > > > return
> > > >> > > > > >>>>> 0.0.
> > > >> > > > > >>>>> Is
> > > >> > > > > >>>>>      it really the correct behaviour? Wouldn't be
> better
> > > to
> > > >> > use a
> > > >> > > > > POJO
> > > >> > > > > >>>>> or
> > > >> > > > > >>>>> the
> > > >> > > > > >>>>>      Row of datatable that can handle void? Moreover,
> > the
> > > >> > mapping
> > > >> > > > > >>>>> between
> > > >> > > > > >>>>> SQL
> > > >> > > > > >>>>>      type and Java types varies much from the single
> > JDBC
> > > >> > > > > >>>>> implementation.
> > > >> > > > > >>>>>      Wouldn't be better to rely on the Java type
> coming
> > > from
> > > >> > > using
> > > >> > > > > >>>>>      resultSet.getObject() to get such a mapping
> rather
> > > than
> > > >> > > using
> > > >> > > > > the
> > > >> > > > > >>>>>      ResultSetMetadata types?
> > > >> > > > > >>>>>      2. I'd like to handle connections very
> efficiently
> > > >> because
> > > >> > > we
> > > >> > > > > >>>>> have a
> > > >> > > > > >>>>> use
> > > >> > > > > >>>>>      case with billions of records and thus millions
> of
> > > >> splits
> > > >> > > and
> > > >> > > > > >>>>> establish
> > > >> > > > > >>>>> a
> > > >> > > > > >>>>>      new connection each time could be expensive.
> Would
> > it
> > > >> be a
> > > >> > > > > >>>>> problem to
> > > >> > > > > >>>>> add
> > > >> > > > > >>>>>      apache pool dependency to the jdbc batch
> connector
> > in
> > > >> > order
> > > >> > > to
> > > >> > > > > >>>>> reuase
> > > >> > > > > >>>>> the
> > > >> > > > > >>>>>      created connections?
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>> [1]
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >>>>>
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Flavio Pompermaier
Hi Fabian, I've just created a JIRA for that (FLINK-3777).
As you said input split should be not too fine-grained but we have a table
with 11 billions of rows that can't be queried with ranges greated than
100K of rows because it has a lot of JOIN and increasing thhis threashold
implies incredibly longer response time). This implies millions of splits
and, thus, millions of calls to open and thus connectiosn re-creation.. :(


On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:

> I agree, a method to close an input format is missing.
> InputFormat is an API stable interface, so it is not possible to extend it
> (until Flink 2.0). RichInputFormat is API stable as well, but an abstract
> class. So it should be possible to add an empty default implementation of a
> closeInputFormat() method there.
>
> Of course it would be good to re-use connections across input splits.
> On the other hand, input splits should not be too fine-grained as well,
> because input split assignment has some overhead as well.
>
> Best, Fabian
>
> 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
>
> > Yes, I forgot to mention that I could instantiate the connection in the
> > configure() but then I can't close it (as you confirmed) :(
> >
> > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > There is also InputFormat.configure() which is called before any split
> > > processing happens. But I see your point about a missing close() method
> > > that is called after all input splits have been processed.
> > >
> > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]>
> > wrote:
> > >
> > > > Of course there is one already. We'll look into the runtime context.
> > > >
> > > > saluti,
> > > > Stefano
> > > >
> > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:
> > > >
> > > > > Being a generic JDBC input format, I would prefer to stay with Row,
> > > > > letting the developer manage the cast according to the driver
> > > > > functionalities.
> > > > >
> > > > > As for the open() and close() issue, I agree with Flavio that we'd
> > > need a
> > > > > better management of the inputformat lifecycle. Perhaps a new
> > interface
> > > > > extending it: RichInputFormat?
> > > > >
> > > > > my2c.
> > > > >
> > > > > Stefano
> > > > >
> > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]
> >:
> > > > >
> > > > >> Talking with Stefano this morning and looking at the
> DataSourceTask
> > > code
> > > > >> we
> > > > >> discovered that the open() and close() methods are both called for
> > > every
> > > > >> split and not once per inputFormat instance (maybe open and close
> > > should
> > > > >> be
> > > > >> renamed as openSplit and closeSplit to avoid confusion...).
> > > > >> I think that it could worth to add 2 methods to the InputFormat
> > (e.g.
> > > > >> openInputFormat() and closeInputFormat() ) to allow for the
> > managment
> > > of
> > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a
> pool
> > > > (and
> > > > >> thus adding a dependency) to avoid the creation of a new
> connection
> > > > >> (expensive operation) for every split (that in our use case
> happens
> > > > >> millions of times).
> > > > >>
> > > > >> What about the output of the inputFormat? how do you want me to
> > > proceed?
> > > > >> With POJO or Row? If POJO, which strategy do you suggest?
> > > > >>
> > > > >> Best,
> > > > >> Flavio
> > > > >>
> > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <
> > [hidden email]
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > If we share the connection, then we should also be careful with
> > the
> > > > >> close()
> > > > >> > implementation. I did not see changes for this method in the PR.
> > > > >> >
> > > > >> > saluti,
> > > > >> > Stefano
> > > > >> >
> > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <
> > [hidden email]
> > > >:
> > > > >> >
> > > > >> > > Following your suggestions I've fixed the connection reuse in
> my
> > > PR
> > > > at
> > > > >> > > https://github.com/apache/flink/pull/1885.
> > > > >> > > I simply check in the establishConnection() if dbConn!=null
> and,
> > > in
> > > > >> that
> > > > >> > > case, I simply return immediately.
> > > > >> > >
> > > > >> > > Thus, the only remaining thin to fix is the null handling. Do
> > you
> > > > have
> > > > >> > any
> > > > >> > > suggestion about how to transform the results in a POJO?
> > > > >> > > Maybe returning a Row and then let the user manage the
> > conversion
> > > to
> > > > >> the
> > > > >> > > target POJO in a successive map could be a more general
> > soloution?
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Flavio
> > > > >> > >
> > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <
> > [hidden email]
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > There is an InputFormat object for each parallel task of a
> > > > >> DataSource.
> > > > >> > > > So for a source with parallelism 8 you will have 8 instances
> > of
> > > > the
> > > > >> > > > InputFormat running, regardless whether this is on one box
> > with
> > > 8
> > > > >> slots
> > > > >> > > or
> > > > >> > > > 8 machines with 1 slots each.
> > > > >> > > > The same is true for all other operators (Map, Reduce, Join,
> > > etc.)
> > > > >> and
> > > > >> > > > DataSinks.
> > > > >> > > >
> > > > >> > > > Note, a single task does not fill a slot, but a "slice" of
> the
> > > > >> program
> > > > >> > > (one
> > > > >> > > > parallel task of each operator) fills a slot.
> > > > >> > > >
> > > > >> > > > Cheers, Fabian
> > > > >> > > >
> > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > > > [hidden email]
> > > > >> >:
> > > > >> > > >
> > > > >> > > > > ok thanks!just one last question: an inputformat is
> > > instantiated
> > > > >> for
> > > > >> > > each
> > > > >> > > > > task slot or once for task manger?
> > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <
> > [hidden email]>
> > > > >> wrote:
> > > > >> > > > >
> > > > >> > > > > > no.
> > > > >> > > > > >
> > > > >> > > > > > if (connection==null) {
> > > > >> > > > > >  establishCOnnection();
> > > > >> > > > > > }
> > > > >> > > > > >
> > > > >> > > > > > done. same connection for all splits.
> > > > >> > > > > >
> > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > >> > > > > >
> > > > >> > > > > >> I didn't understand what you mean for "it should also
> be
> > > > >> possible
> > > > >> > to
> > > > >> > > > > reuse
> > > > >> > > > > >> the same connection of an InputFormat across
> InputSplits,
> > > > i.e.,
> > > > >> > > calls
> > > > >> > > > of
> > > > >> > > > > >> the open() method".
> > > > >> > > > > >> At the moment in the open method there's a call to
> > > > >> > > > establishConnection,
> > > > >> > > > > >> thus, a new connection is created for each split.
> > > > >> > > > > >> If I understood correctly, you're suggesting to create
> a
> > > pool
> > > > >> in
> > > > >> > the
> > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open()
> > > rather
> > > > >> than
> > > > >> > > > > >> establishConnection?
> > > > >> > > > > >>
> > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <
> > > [hidden email]
> > > > >
> > > > >> > > wrote:
> > > > >> > > > > >>
> > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > >> > > > > >>>
> > > > >> > > > > >>> Hi Flavio,
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> that are good questions.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> 1) Replacing null values by default values and simply
> > > > >> forwarding
> > > > >> > > > > records
> > > > >> > > > > >>>> is
> > > > >> > > > > >>>> very dangerous, in my opinion.
> > > > >> > > > > >>>> I see two alternatives: A) we use a data type that
> > > > tolerates
> > > > >> > null
> > > > >> > > > > >>>> values.
> > > > >> > > > > >>>> This could be a POJO that the user has to provide or
> > Row.
> > > > The
> > > > >> > > > drawback
> > > > >> > > > > >>>> of
> > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B)
> We
> > > use
> > > > >> > Tuple
> > > > >> > > > and
> > > > >> > > > > >>>> add
> > > > >> > > > > >>>> an additional field that holds an Integer which
> serves
> > > as a
> > > > >> > bitset
> > > > >> > > > to
> > > > >> > > > > >>>> mark
> > > > >> > > > > >>>> null fields. This would be a pretty low level API
> > > though. I
> > > > >> am
> > > > >> > > > leaning
> > > > >> > > > > >>>> towards the user-provided POJO option.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> i would also lean towards the POJO option.
> > > > >> > > > > >>>
> > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
> > > > >> module. I
> > > > >> > > > think
> > > > >> > > > > we
> > > > >> > > > > >>>> can add a dependency to that module. However, it
> should
> > > > also
> > > > >> be
> > > > >> > > > > possible
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>> reuse the same connection of an InputFormat across
> > > > >> InputSplits,
> > > > >> > > > i.e.,
> > > > >> > > > > >>>> calls
> > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> this is the right approach imo.
> > > > >> > > > > >>>
> > > > >> > > > > >>> Best, Fabian
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > > > >> > > [hidden email]
> > > > >> > > > >:
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> Hi guys,
> > > > >> > > > > >>>>
> > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
> > > > >> there's a
> > > > >> > > > couple
> > > > >> > > > > >>>>> of
> > > > >> > > > > >>>>> thing that I'd like to discuss with the core
> > developers.
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>      1. about the JDBC type mapping (addValue()
> method
> > > at
> > > > >> [1]:
> > > > >> > At
> > > > >> > > > the
> > > > >> > > > > >>>>> moment
> > > > >> > > > > >>>>>      if I find a null value for a  Double, the
> > getDouble
> > > > of
> > > > >> > jdbc
> > > > >> > > > > return
> > > > >> > > > > >>>>> 0.0.
> > > > >> > > > > >>>>> Is
> > > > >> > > > > >>>>>      it really the correct behaviour? Wouldn't be
> > better
> > > > to
> > > > >> > use a
> > > > >> > > > > POJO
> > > > >> > > > > >>>>> or
> > > > >> > > > > >>>>> the
> > > > >> > > > > >>>>>      Row of datatable that can handle void?
> Moreover,
> > > the
> > > > >> > mapping
> > > > >> > > > > >>>>> between
> > > > >> > > > > >>>>> SQL
> > > > >> > > > > >>>>>      type and Java types varies much from the single
> > > JDBC
> > > > >> > > > > >>>>> implementation.
> > > > >> > > > > >>>>>      Wouldn't be better to rely on the Java type
> > coming
> > > > from
> > > > >> > > using
> > > > >> > > > > >>>>>      resultSet.getObject() to get such a mapping
> > rather
> > > > than
> > > > >> > > using
> > > > >> > > > > the
> > > > >> > > > > >>>>>      ResultSetMetadata types?
> > > > >> > > > > >>>>>      2. I'd like to handle connections very
> > efficiently
> > > > >> because
> > > > >> > > we
> > > > >> > > > > >>>>> have a
> > > > >> > > > > >>>>> use
> > > > >> > > > > >>>>>      case with billions of records and thus millions
> > of
> > > > >> splits
> > > > >> > > and
> > > > >> > > > > >>>>> establish
> > > > >> > > > > >>>>> a
> > > > >> > > > > >>>>>      new connection each time could be expensive.
> > Would
> > > it
> > > > >> be a
> > > > >> > > > > >>>>> problem to
> > > > >> > > > > >>>>> add
> > > > >> > > > > >>>>>      apache pool dependency to the jdbc batch
> > connector
> > > in
> > > > >> > order
> > > > >> > > to
> > > > >> > > > > >>>>> reuase
> > > > >> > > > > >>>>> the
> > > > >> > > > > >>>>>      created connections?
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> [1]
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: FLINK-3750 (JDBCInputFormat)

Stefano Bortoli
As Flavio underlined, it is not about selecting a certain number of rows,
but executing queries with sequence of joins on a very large database. We
played around to find the best throughput. Honestly, I prefer to have many
smaller range-queries with more parallel threads than fewer expensive
queries. Of course this makes sense if we can reuse the connection. Flavio
created a PR that should provide a good starting point to dealing with this
types of connections.

thanks a lot for the great support!

saluti,
Stefano

2016-04-18 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>:

> Hi Fabian, I've just created a JIRA for that (FLINK-3777).
> As you said input split should be not too fine-grained but we have a table
> with 11 billions of rows that can't be queried with ranges greated than
> 100K of rows because it has a lot of JOIN and increasing thhis threashold
> implies incredibly longer response time). This implies millions of splits
> and, thus, millions of calls to open and thus connectiosn re-creation.. :(
>
>
> On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <[hidden email]> wrote:
>
> > I agree, a method to close an input format is missing.
> > InputFormat is an API stable interface, so it is not possible to extend
> it
> > (until Flink 2.0). RichInputFormat is API stable as well, but an abstract
> > class. So it should be possible to add an empty default implementation
> of a
> > closeInputFormat() method there.
> >
> > Of course it would be good to re-use connections across input splits.
> > On the other hand, input splits should not be too fine-grained as well,
> > because input split assignment has some overhead as well.
> >
> > Best, Fabian
> >
> > 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>:
> >
> > > Yes, I forgot to mention that I could instantiate the connection in the
> > > configure() but then I can't close it (as you confirmed) :(
> > >
> > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]
> >
> > > wrote:
> > >
> > > > There is also InputFormat.configure() which is called before any
> split
> > > > processing happens. But I see your point about a missing close()
> method
> > > > that is called after all input splits have been processed.
> > > >
> > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]>
> > > wrote:
> > > >
> > > > > Of course there is one already. We'll look into the runtime
> context.
> > > > >
> > > > > saluti,
> > > > > Stefano
> > > > >
> > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>:
> > > > >
> > > > > > Being a generic JDBC input format, I would prefer to stay with
> Row,
> > > > > > letting the developer manage the cast according to the driver
> > > > > > functionalities.
> > > > > >
> > > > > > As for the open() and close() issue, I agree with Flavio that
> we'd
> > > > need a
> > > > > > better management of the inputformat lifecycle. Perhaps a new
> > > interface
> > > > > > extending it: RichInputFormat?
> > > > > >
> > > > > > my2c.
> > > > > >
> > > > > > Stefano
> > > > > >
> > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <
> [hidden email]
> > >:
> > > > > >
> > > > > >> Talking with Stefano this morning and looking at the
> > DataSourceTask
> > > > code
> > > > > >> we
> > > > > >> discovered that the open() and close() methods are both called
> for
> > > > every
> > > > > >> split and not once per inputFormat instance (maybe open and
> close
> > > > should
> > > > > >> be
> > > > > >> renamed as openSplit and closeSplit to avoid confusion...).
> > > > > >> I think that it could worth to add 2 methods to the InputFormat
> > > (e.g.
> > > > > >> openInputFormat() and closeInputFormat() ) to allow for the
> > > managment
> > > > of
> > > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a
> > pool
> > > > > (and
> > > > > >> thus adding a dependency) to avoid the creation of a new
> > connection
> > > > > >> (expensive operation) for every split (that in our use case
> > happens
> > > > > >> millions of times).
> > > > > >>
> > > > > >> What about the output of the inputFormat? how do you want me to
> > > > proceed?
> > > > > >> With POJO or Row? If POJO, which strategy do you suggest?
> > > > > >>
> > > > > >> Best,
> > > > > >> Flavio
> > > > > >>
> > > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <
> > > [hidden email]
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >> > If we share the connection, then we should also be careful
> with
> > > the
> > > > > >> close()
> > > > > >> > implementation. I did not see changes for this method in the
> PR.
> > > > > >> >
> > > > > >> > saluti,
> > > > > >> > Stefano
> > > > > >> >
> > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <
> > > [hidden email]
> > > > >:
> > > > > >> >
> > > > > >> > > Following your suggestions I've fixed the connection reuse
> in
> > my
> > > > PR
> > > > > at
> > > > > >> > > https://github.com/apache/flink/pull/1885.
> > > > > >> > > I simply check in the establishConnection() if dbConn!=null
> > and,
> > > > in
> > > > > >> that
> > > > > >> > > case, I simply return immediately.
> > > > > >> > >
> > > > > >> > > Thus, the only remaining thin to fix is the null handling.
> Do
> > > you
> > > > > have
> > > > > >> > any
> > > > > >> > > suggestion about how to transform the results in a POJO?
> > > > > >> > > Maybe returning a Row and then let the user manage the
> > > conversion
> > > > to
> > > > > >> the
> > > > > >> > > target POJO in a successive map could be a more general
> > > soloution?
> > > > > >> > >
> > > > > >> > > Best,
> > > > > >> > > Flavio
> > > > > >> > >
> > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <
> > > [hidden email]
> > > > >
> > > > > >> > wrote:
> > > > > >> > >
> > > > > >> > > > There is an InputFormat object for each parallel task of a
> > > > > >> DataSource.
> > > > > >> > > > So for a source with parallelism 8 you will have 8
> instances
> > > of
> > > > > the
> > > > > >> > > > InputFormat running, regardless whether this is on one box
> > > with
> > > > 8
> > > > > >> slots
> > > > > >> > > or
> > > > > >> > > > 8 machines with 1 slots each.
> > > > > >> > > > The same is true for all other operators (Map, Reduce,
> Join,
> > > > etc.)
> > > > > >> and
> > > > > >> > > > DataSinks.
> > > > > >> > > >
> > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of
> > the
> > > > > >> program
> > > > > >> > > (one
> > > > > >> > > > parallel task of each operator) fills a slot.
> > > > > >> > > >
> > > > > >> > > > Cheers, Fabian
> > > > > >> > > >
> > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > > > > [hidden email]
> > > > > >> >:
> > > > > >> > > >
> > > > > >> > > > > ok thanks!just one last question: an inputformat is
> > > > instantiated
> > > > > >> for
> > > > > >> > > each
> > > > > >> > > > > task slot or once for task manger?
> > > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <
> > > [hidden email]>
> > > > > >> wrote:
> > > > > >> > > > >
> > > > > >> > > > > > no.
> > > > > >> > > > > >
> > > > > >> > > > > > if (connection==null) {
> > > > > >> > > > > >  establishCOnnection();
> > > > > >> > > > > > }
> > > > > >> > > > > >
> > > > > >> > > > > > done. same connection for all splits.
> > > > > >> > > > > >
> > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > > >> > > > > >
> > > > > >> > > > > >> I didn't understand what you mean for "it should also
> > be
> > > > > >> possible
> > > > > >> > to
> > > > > >> > > > > reuse
> > > > > >> > > > > >> the same connection of an InputFormat across
> > InputSplits,
> > > > > i.e.,
> > > > > >> > > calls
> > > > > >> > > > of
> > > > > >> > > > > >> the open() method".
> > > > > >> > > > > >> At the moment in the open method there's a call to
> > > > > >> > > > establishConnection,
> > > > > >> > > > > >> thus, a new connection is created for each split.
> > > > > >> > > > > >> If I understood correctly, you're suggesting to
> create
> > a
> > > > pool
> > > > > >> in
> > > > > >> > the
> > > > > >> > > > > >> inputFormat and simply call poo.borrow() in the
> open()
> > > > rather
> > > > > >> than
> > > > > >> > > > > >> establishConnection?
> > > > > >> > > > > >>
> > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <
> > > > [hidden email]
> > > > > >
> > > > > >> > > wrote:
> > > > > >> > > > > >>
> > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> Hi Flavio,
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> that are good questions.
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> 1) Replacing null values by default values and
> simply
> > > > > >> forwarding
> > > > > >> > > > > records
> > > > > >> > > > > >>>> is
> > > > > >> > > > > >>>> very dangerous, in my opinion.
> > > > > >> > > > > >>>> I see two alternatives: A) we use a data type that
> > > > > tolerates
> > > > > >> > null
> > > > > >> > > > > >>>> values.
> > > > > >> > > > > >>>> This could be a POJO that the user has to provide
> or
> > > Row.
> > > > > The
> > > > > >> > > > drawback
> > > > > >> > > > > >>>> of
> > > > > >> > > > > >>>> Row is that it is untyped and not easy to handle.
> B)
> > We
> > > > use
> > > > > >> > Tuple
> > > > > >> > > > and
> > > > > >> > > > > >>>> add
> > > > > >> > > > > >>>> an additional field that holds an Integer which
> > serves
> > > > as a
> > > > > >> > bitset
> > > > > >> > > > to
> > > > > >> > > > > >>>> mark
> > > > > >> > > > > >>>> null fields. This would be a pretty low level API
> > > > though. I
> > > > > >> am
> > > > > >> > > > leaning
> > > > > >> > > > > >>>> towards the user-provided POJO option.
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> i would also lean towards the POJO option.
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated
> Maven
> > > > > >> module. I
> > > > > >> > > > think
> > > > > >> > > > > we
> > > > > >> > > > > >>>> can add a dependency to that module. However, it
> > should
> > > > > also
> > > > > >> be
> > > > > >> > > > > possible
> > > > > >> > > > > >>>> to
> > > > > >> > > > > >>>> reuse the same connection of an InputFormat across
> > > > > >> InputSplits,
> > > > > >> > > > i.e.,
> > > > > >> > > > > >>>> calls
> > > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> this is the right approach imo.
> > > > > >> > > > > >>>
> > > > > >> > > > > >>> Best, Fabian
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > > > > >> > > [hidden email]
> > > > > >> > > > >:
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>> Hi guys,
> > > > > >> > > > > >>>>
> > > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR
> but
> > > > > >> there's a
> > > > > >> > > > couple
> > > > > >> > > > > >>>>> of
> > > > > >> > > > > >>>>> thing that I'd like to discuss with the core
> > > developers.
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>      1. about the JDBC type mapping (addValue()
> > method
> > > > at
> > > > > >> [1]:
> > > > > >> > At
> > > > > >> > > > the
> > > > > >> > > > > >>>>> moment
> > > > > >> > > > > >>>>>      if I find a null value for a  Double, the
> > > getDouble
> > > > > of
> > > > > >> > jdbc
> > > > > >> > > > > return
> > > > > >> > > > > >>>>> 0.0.
> > > > > >> > > > > >>>>> Is
> > > > > >> > > > > >>>>>      it really the correct behaviour? Wouldn't be
> > > better
> > > > > to
> > > > > >> > use a
> > > > > >> > > > > POJO
> > > > > >> > > > > >>>>> or
> > > > > >> > > > > >>>>> the
> > > > > >> > > > > >>>>>      Row of datatable that can handle void?
> > Moreover,
> > > > the
> > > > > >> > mapping
> > > > > >> > > > > >>>>> between
> > > > > >> > > > > >>>>> SQL
> > > > > >> > > > > >>>>>      type and Java types varies much from the
> single
> > > > JDBC
> > > > > >> > > > > >>>>> implementation.
> > > > > >> > > > > >>>>>      Wouldn't be better to rely on the Java type
> > > coming
> > > > > from
> > > > > >> > > using
> > > > > >> > > > > >>>>>      resultSet.getObject() to get such a mapping
> > > rather
> > > > > than
> > > > > >> > > using
> > > > > >> > > > > the
> > > > > >> > > > > >>>>>      ResultSetMetadata types?
> > > > > >> > > > > >>>>>      2. I'd like to handle connections very
> > > efficiently
> > > > > >> because
> > > > > >> > > we
> > > > > >> > > > > >>>>> have a
> > > > > >> > > > > >>>>> use
> > > > > >> > > > > >>>>>      case with billions of records and thus
> millions
> > > of
> > > > > >> splits
> > > > > >> > > and
> > > > > >> > > > > >>>>> establish
> > > > > >> > > > > >>>>> a
> > > > > >> > > > > >>>>>      new connection each time could be expensive.
> > > Would
> > > > it
> > > > > >> be a
> > > > > >> > > > > >>>>> problem to
> > > > > >> > > > > >>>>> add
> > > > > >> > > > > >>>>>      apache pool dependency to the jdbc batch
> > > connector
> > > > in
> > > > > >> > order
> > > > > >> > > to
> > > > > >> > > > > >>>>> reuase
> > > > > >> > > > > >>>>> the
> > > > > >> > > > > >>>>>      created connections?
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>> [1]
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >>>>>
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>