Hi guys,
I'm integrating the comments of Chesnay to my PR but there's a couple of thing that I'd like to discuss with the core developers. 1. about the JDBC type mapping (addValue() method at [1]: At the moment if I find a null value for a Double, the getDouble of jdbc return 0.0. Is it really the correct behaviour? Wouldn't be better to use a POJO or the Row of datatable that can handle void? Moreover, the mapping between SQL type and Java types varies much from the single JDBC implementation. Wouldn't be better to rely on the Java type coming from using resultSet.getObject() to get such a mapping rather than using the ResultSetMetadata types? 2. I'd like to handle connections very efficiently because we have a use case with billions of records and thus millions of splits and establish a new connection each time could be expensive. Would it be a problem to add apache pool dependency to the jdbc batch connector in order to reuase the created connections? [1] https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java |
Hi Flavio,
that are good questions. 1) Replacing null values by default values and simply forwarding records is very dangerous, in my opinion. I see two alternatives: A) we use a data type that tolerates null values. This could be a POJO that the user has to provide or Row. The drawback of Row is that it is untyped and not easy to handle. B) We use Tuple and add an additional field that holds an Integer which serves as a bitset to mark null fields. This would be a pretty low level API though. I am leaning towards the user-provided POJO option. 2) The JDBCInputFormat is located in a dedicated Maven module. I think we can add a dependency to that module. However, it should also be possible to reuse the same connection of an InputFormat across InputSplits, i.e., calls of the open() method. Wouldn't that be sufficient? Best, Fabian 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Hi guys, > > I'm integrating the comments of Chesnay to my PR but there's a couple of > thing that I'd like to discuss with the core developers. > > > 1. about the JDBC type mapping (addValue() method at [1]: At the moment > if I find a null value for a Double, the getDouble of jdbc return 0.0. > Is > it really the correct behaviour? Wouldn't be better to use a POJO or the > Row of datatable that can handle void? Moreover, the mapping between SQL > type and Java types varies much from the single JDBC implementation. > Wouldn't be better to rely on the Java type coming from using > resultSet.getObject() to get such a mapping rather than using the > ResultSetMetadata types? > 2. I'd like to handle connections very efficiently because we have a use > case with billions of records and thus millions of splits and establish > a > new connection each time could be expensive. Would it be a problem to > add > apache pool dependency to the jdbc batch connector in order to reuase > the > created connections? > > > [1] > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > |
On 14.04.2016 17:22, Fabian Hueske wrote:
> Hi Flavio, > > that are good questions. > > 1) Replacing null values by default values and simply forwarding records is > very dangerous, in my opinion. > I see two alternatives: A) we use a data type that tolerates null values. > This could be a POJO that the user has to provide or Row. The drawback of > Row is that it is untyped and not easy to handle. B) We use Tuple and add > an additional field that holds an Integer which serves as a bitset to mark > null fields. This would be a pretty low level API though. I am leaning > towards the user-provided POJO option. > > 2) The JDBCInputFormat is located in a dedicated Maven module. I think we > can add a dependency to that module. However, it should also be possible to > reuse the same connection of an InputFormat across InputSplits, i.e., calls > of the open() method. Wouldn't that be sufficient? this is the right approach imo. > Best, Fabian > > 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > >> Hi guys, >> >> I'm integrating the comments of Chesnay to my PR but there's a couple of >> thing that I'd like to discuss with the core developers. >> >> >> 1. about the JDBC type mapping (addValue() method at [1]: At the moment >> if I find a null value for a Double, the getDouble of jdbc return 0.0. >> Is >> it really the correct behaviour? Wouldn't be better to use a POJO or the >> Row of datatable that can handle void? Moreover, the mapping between SQL >> type and Java types varies much from the single JDBC implementation. >> Wouldn't be better to rely on the Java type coming from using >> resultSet.getObject() to get such a mapping rather than using the >> ResultSetMetadata types? >> 2. I'd like to handle connections very efficiently because we have a use >> case with billions of records and thus millions of splits and establish >> a >> new connection each time could be expensive. Would it be a problem to >> add >> apache pool dependency to the jdbc batch connector in order to reuase >> the >> created connections? >> >> >> [1] >> >> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >> |
I didn't understand what you mean for "it should also be possible to reuse
the same connection of an InputFormat across InputSplits, i.e., calls of the open() method". At the moment in the open method there's a call to establishConnection, thus, a new connection is created for each split. If I understood correctly, you're suggesting to create a pool in the inputFormat and simply call poo.borrow() in the open() rather than establishConnection? On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: > On 14.04.2016 17:22, Fabian Hueske wrote: > >> Hi Flavio, >> >> that are good questions. >> >> 1) Replacing null values by default values and simply forwarding records >> is >> very dangerous, in my opinion. >> I see two alternatives: A) we use a data type that tolerates null values. >> This could be a POJO that the user has to provide or Row. The drawback of >> Row is that it is untyped and not easy to handle. B) We use Tuple and add >> an additional field that holds an Integer which serves as a bitset to mark >> null fields. This would be a pretty low level API though. I am leaning >> towards the user-provided POJO option. >> > i would also lean towards the POJO option. > >> >> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we >> can add a dependency to that module. However, it should also be possible >> to >> reuse the same connection of an InputFormat across InputSplits, i.e., >> calls >> of the open() method. Wouldn't that be sufficient? >> > this is the right approach imo. > >> Best, Fabian >> >> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: >> >> Hi guys, >>> >>> I'm integrating the comments of Chesnay to my PR but there's a couple of >>> thing that I'd like to discuss with the core developers. >>> >>> >>> 1. about the JDBC type mapping (addValue() method at [1]: At the >>> moment >>> if I find a null value for a Double, the getDouble of jdbc return >>> 0.0. >>> Is >>> it really the correct behaviour? Wouldn't be better to use a POJO or >>> the >>> Row of datatable that can handle void? Moreover, the mapping between >>> SQL >>> type and Java types varies much from the single JDBC implementation. >>> Wouldn't be better to rely on the Java type coming from using >>> resultSet.getObject() to get such a mapping rather than using the >>> ResultSetMetadata types? >>> 2. I'd like to handle connections very efficiently because we have a >>> use >>> case with billions of records and thus millions of splits and >>> establish >>> a >>> new connection each time could be expensive. Would it be a problem to >>> add >>> apache pool dependency to the jdbc batch connector in order to reuase >>> the >>> created connections? >>> >>> >>> [1] >>> >>> >>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >>> >>> > |
An InputFormat object processes several InputSplits, so open() is
repeatedly called on the same object. I suggest to create the connection in the first open() call and reuse it in all subsequent open() calls. So no pool at all ;-) 2016-04-14 17:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > I didn't understand what you mean for "it should also be possible to reuse > the same connection of an InputFormat across InputSplits, i.e., calls of > the open() method". > At the moment in the open method there's a call to establishConnection, > thus, a new connection is created for each split. > If I understood correctly, you're suggesting to create a pool in the > inputFormat and simply call poo.borrow() in the open() rather than > establishConnection? > > On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: > > > On 14.04.2016 17:22, Fabian Hueske wrote: > > > >> Hi Flavio, > >> > >> that are good questions. > >> > >> 1) Replacing null values by default values and simply forwarding records > >> is > >> very dangerous, in my opinion. > >> I see two alternatives: A) we use a data type that tolerates null > values. > >> This could be a POJO that the user has to provide or Row. The drawback > of > >> Row is that it is untyped and not easy to handle. B) We use Tuple and > add > >> an additional field that holds an Integer which serves as a bitset to > mark > >> null fields. This would be a pretty low level API though. I am leaning > >> towards the user-provided POJO option. > >> > > i would also lean towards the POJO option. > > > >> > >> 2) The JDBCInputFormat is located in a dedicated Maven module. I think > we > >> can add a dependency to that module. However, it should also be possible > >> to > >> reuse the same connection of an InputFormat across InputSplits, i.e., > >> calls > >> of the open() method. Wouldn't that be sufficient? > >> > > this is the right approach imo. > > > >> Best, Fabian > >> > >> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > >> > >> Hi guys, > >>> > >>> I'm integrating the comments of Chesnay to my PR but there's a couple > of > >>> thing that I'd like to discuss with the core developers. > >>> > >>> > >>> 1. about the JDBC type mapping (addValue() method at [1]: At the > >>> moment > >>> if I find a null value for a Double, the getDouble of jdbc return > >>> 0.0. > >>> Is > >>> it really the correct behaviour? Wouldn't be better to use a POJO > or > >>> the > >>> Row of datatable that can handle void? Moreover, the mapping > between > >>> SQL > >>> type and Java types varies much from the single JDBC > implementation. > >>> Wouldn't be better to rely on the Java type coming from using > >>> resultSet.getObject() to get such a mapping rather than using the > >>> ResultSetMetadata types? > >>> 2. I'd like to handle connections very efficiently because we have > a > >>> use > >>> case with billions of records and thus millions of splits and > >>> establish > >>> a > >>> new connection each time could be expensive. Would it be a problem > to > >>> add > >>> apache pool dependency to the jdbc batch connector in order to > reuase > >>> the > >>> created connections? > >>> > >>> > >>> [1] > >>> > >>> > >>> > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > >>> > >>> > > > |
In reply to this post by Flavio Pompermaier
no.
if (connection==null) { establishCOnnection(); } done. same connection for all splits. On 14.04.2016 17:59, Flavio Pompermaier wrote: > I didn't understand what you mean for "it should also be possible to reuse > the same connection of an InputFormat across InputSplits, i.e., calls of > the open() method". > At the moment in the open method there's a call to establishConnection, > thus, a new connection is created for each split. > If I understood correctly, you're suggesting to create a pool in the > inputFormat and simply call poo.borrow() in the open() rather than > establishConnection? > > On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: > >> On 14.04.2016 17:22, Fabian Hueske wrote: >> >>> Hi Flavio, >>> >>> that are good questions. >>> >>> 1) Replacing null values by default values and simply forwarding records >>> is >>> very dangerous, in my opinion. >>> I see two alternatives: A) we use a data type that tolerates null values. >>> This could be a POJO that the user has to provide or Row. The drawback of >>> Row is that it is untyped and not easy to handle. B) We use Tuple and add >>> an additional field that holds an Integer which serves as a bitset to mark >>> null fields. This would be a pretty low level API though. I am leaning >>> towards the user-provided POJO option. >>> >> i would also lean towards the POJO option. >> >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we >>> can add a dependency to that module. However, it should also be possible >>> to >>> reuse the same connection of an InputFormat across InputSplits, i.e., >>> calls >>> of the open() method. Wouldn't that be sufficient? >>> >> this is the right approach imo. >> >>> Best, Fabian >>> >>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: >>> >>> Hi guys, >>>> I'm integrating the comments of Chesnay to my PR but there's a couple of >>>> thing that I'd like to discuss with the core developers. >>>> >>>> >>>> 1. about the JDBC type mapping (addValue() method at [1]: At the >>>> moment >>>> if I find a null value for a Double, the getDouble of jdbc return >>>> 0.0. >>>> Is >>>> it really the correct behaviour? Wouldn't be better to use a POJO or >>>> the >>>> Row of datatable that can handle void? Moreover, the mapping between >>>> SQL >>>> type and Java types varies much from the single JDBC implementation. >>>> Wouldn't be better to rely on the Java type coming from using >>>> resultSet.getObject() to get such a mapping rather than using the >>>> ResultSetMetadata types? >>>> 2. I'd like to handle connections very efficiently because we have a >>>> use >>>> case with billions of records and thus millions of splits and >>>> establish >>>> a >>>> new connection each time could be expensive. Would it be a problem to >>>> add >>>> apache pool dependency to the jdbc batch connector in order to reuase >>>> the >>>> created connections? >>>> >>>> >>>> [1] >>>> >>>> >>>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >>>> >>>> |
ok thanks!just one last question: an inputformat is instantiated for each
task slot or once for task manger? On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote: > no. > > if (connection==null) { > establishCOnnection(); > } > > done. same connection for all splits. > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > >> I didn't understand what you mean for "it should also be possible to reuse >> the same connection of an InputFormat across InputSplits, i.e., calls of >> the open() method". >> At the moment in the open method there's a call to establishConnection, >> thus, a new connection is created for each split. >> If I understood correctly, you're suggesting to create a pool in the >> inputFormat and simply call poo.borrow() in the open() rather than >> establishConnection? >> >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: >> >> On 14.04.2016 17:22, Fabian Hueske wrote: >>> >>> Hi Flavio, >>>> >>>> that are good questions. >>>> >>>> 1) Replacing null values by default values and simply forwarding records >>>> is >>>> very dangerous, in my opinion. >>>> I see two alternatives: A) we use a data type that tolerates null >>>> values. >>>> This could be a POJO that the user has to provide or Row. The drawback >>>> of >>>> Row is that it is untyped and not easy to handle. B) We use Tuple and >>>> add >>>> an additional field that holds an Integer which serves as a bitset to >>>> mark >>>> null fields. This would be a pretty low level API though. I am leaning >>>> towards the user-provided POJO option. >>>> >>>> i would also lean towards the POJO option. >>> >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think we >>>> can add a dependency to that module. However, it should also be possible >>>> to >>>> reuse the same connection of an InputFormat across InputSplits, i.e., >>>> calls >>>> of the open() method. Wouldn't that be sufficient? >>>> >>>> this is the right approach imo. >>> >>> Best, Fabian >>>> >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: >>>> >>>> Hi guys, >>>> >>>>> I'm integrating the comments of Chesnay to my PR but there's a couple >>>>> of >>>>> thing that I'd like to discuss with the core developers. >>>>> >>>>> >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At the >>>>> moment >>>>> if I find a null value for a Double, the getDouble of jdbc return >>>>> 0.0. >>>>> Is >>>>> it really the correct behaviour? Wouldn't be better to use a POJO >>>>> or >>>>> the >>>>> Row of datatable that can handle void? Moreover, the mapping >>>>> between >>>>> SQL >>>>> type and Java types varies much from the single JDBC >>>>> implementation. >>>>> Wouldn't be better to rely on the Java type coming from using >>>>> resultSet.getObject() to get such a mapping rather than using the >>>>> ResultSetMetadata types? >>>>> 2. I'd like to handle connections very efficiently because we >>>>> have a >>>>> use >>>>> case with billions of records and thus millions of splits and >>>>> establish >>>>> a >>>>> new connection each time could be expensive. Would it be a >>>>> problem to >>>>> add >>>>> apache pool dependency to the jdbc batch connector in order to >>>>> reuase >>>>> the >>>>> created connections? >>>>> >>>>> >>>>> [1] >>>>> >>>>> >>>>> >>>>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >>>>> >>>>> >>>>> > |
There is an InputFormat object for each parallel task of a DataSource.
So for a source with parallelism 8 you will have 8 instances of the InputFormat running, regardless whether this is on one box with 8 slots or 8 machines with 1 slots each. The same is true for all other operators (Map, Reduce, Join, etc.) and DataSinks. Note, a single task does not fill a slot, but a "slice" of the program (one parallel task of each operator) fills a slot. Cheers, Fabian 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>: > ok thanks!just one last question: an inputformat is instantiated for each > task slot or once for task manger? > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote: > > > no. > > > > if (connection==null) { > > establishCOnnection(); > > } > > > > done. same connection for all splits. > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > >> I didn't understand what you mean for "it should also be possible to > reuse > >> the same connection of an InputFormat across InputSplits, i.e., calls of > >> the open() method". > >> At the moment in the open method there's a call to establishConnection, > >> thus, a new connection is created for each split. > >> If I understood correctly, you're suggesting to create a pool in the > >> inputFormat and simply call poo.borrow() in the open() rather than > >> establishConnection? > >> > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: > >> > >> On 14.04.2016 17:22, Fabian Hueske wrote: > >>> > >>> Hi Flavio, > >>>> > >>>> that are good questions. > >>>> > >>>> 1) Replacing null values by default values and simply forwarding > records > >>>> is > >>>> very dangerous, in my opinion. > >>>> I see two alternatives: A) we use a data type that tolerates null > >>>> values. > >>>> This could be a POJO that the user has to provide or Row. The drawback > >>>> of > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple and > >>>> add > >>>> an additional field that holds an Integer which serves as a bitset to > >>>> mark > >>>> null fields. This would be a pretty low level API though. I am leaning > >>>> towards the user-provided POJO option. > >>>> > >>>> i would also lean towards the POJO option. > >>> > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I think > we > >>>> can add a dependency to that module. However, it should also be > possible > >>>> to > >>>> reuse the same connection of an InputFormat across InputSplits, i.e., > >>>> calls > >>>> of the open() method. Wouldn't that be sufficient? > >>>> > >>>> this is the right approach imo. > >>> > >>> Best, Fabian > >>>> > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > >>>> > >>>> Hi guys, > >>>> > >>>>> I'm integrating the comments of Chesnay to my PR but there's a couple > >>>>> of > >>>>> thing that I'd like to discuss with the core developers. > >>>>> > >>>>> > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At the > >>>>> moment > >>>>> if I find a null value for a Double, the getDouble of jdbc > return > >>>>> 0.0. > >>>>> Is > >>>>> it really the correct behaviour? Wouldn't be better to use a > POJO > >>>>> or > >>>>> the > >>>>> Row of datatable that can handle void? Moreover, the mapping > >>>>> between > >>>>> SQL > >>>>> type and Java types varies much from the single JDBC > >>>>> implementation. > >>>>> Wouldn't be better to rely on the Java type coming from using > >>>>> resultSet.getObject() to get such a mapping rather than using > the > >>>>> ResultSetMetadata types? > >>>>> 2. I'd like to handle connections very efficiently because we > >>>>> have a > >>>>> use > >>>>> case with billions of records and thus millions of splits and > >>>>> establish > >>>>> a > >>>>> new connection each time could be expensive. Would it be a > >>>>> problem to > >>>>> add > >>>>> apache pool dependency to the jdbc batch connector in order to > >>>>> reuase > >>>>> the > >>>>> created connections? > >>>>> > >>>>> > >>>>> [1] > >>>>> > >>>>> > >>>>> > >>>>> > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > >>>>> > >>>>> > >>>>> > > > |
Following your suggestions I've fixed the connection reuse in my PR at
https://github.com/apache/flink/pull/1885. I simply check in the establishConnection() if dbConn!=null and, in that case, I simply return immediately. Thus, the only remaining thin to fix is the null handling. Do you have any suggestion about how to transform the results in a POJO? Maybe returning a Row and then let the user manage the conversion to the target POJO in a successive map could be a more general soloution? Best, Flavio On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> wrote: > There is an InputFormat object for each parallel task of a DataSource. > So for a source with parallelism 8 you will have 8 instances of the > InputFormat running, regardless whether this is on one box with 8 slots or > 8 machines with 1 slots each. > The same is true for all other operators (Map, Reduce, Join, etc.) and > DataSinks. > > Note, a single task does not fill a slot, but a "slice" of the program (one > parallel task of each operator) fills a slot. > > Cheers, Fabian > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > ok thanks!just one last question: an inputformat is instantiated for each > > task slot or once for task manger? > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote: > > > > > no. > > > > > > if (connection==null) { > > > establishCOnnection(); > > > } > > > > > > done. same connection for all splits. > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > >> I didn't understand what you mean for "it should also be possible to > > reuse > > >> the same connection of an InputFormat across InputSplits, i.e., calls > of > > >> the open() method". > > >> At the moment in the open method there's a call to > establishConnection, > > >> thus, a new connection is created for each split. > > >> If I understood correctly, you're suggesting to create a pool in the > > >> inputFormat and simply call poo.borrow() in the open() rather than > > >> establishConnection? > > >> > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> wrote: > > >> > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > >>> > > >>> Hi Flavio, > > >>>> > > >>>> that are good questions. > > >>>> > > >>>> 1) Replacing null values by default values and simply forwarding > > records > > >>>> is > > >>>> very dangerous, in my opinion. > > >>>> I see two alternatives: A) we use a data type that tolerates null > > >>>> values. > > >>>> This could be a POJO that the user has to provide or Row. The > drawback > > >>>> of > > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple > and > > >>>> add > > >>>> an additional field that holds an Integer which serves as a bitset > to > > >>>> mark > > >>>> null fields. This would be a pretty low level API though. I am > leaning > > >>>> towards the user-provided POJO option. > > >>>> > > >>>> i would also lean towards the POJO option. > > >>> > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > think > > we > > >>>> can add a dependency to that module. However, it should also be > > possible > > >>>> to > > >>>> reuse the same connection of an InputFormat across InputSplits, > i.e., > > >>>> calls > > >>>> of the open() method. Wouldn't that be sufficient? > > >>>> > > >>>> this is the right approach imo. > > >>> > > >>> Best, Fabian > > >>>> > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <[hidden email] > >: > > >>>> > > >>>> Hi guys, > > >>>> > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > couple > > >>>>> of > > >>>>> thing that I'd like to discuss with the core developers. > > >>>>> > > >>>>> > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At > the > > >>>>> moment > > >>>>> if I find a null value for a Double, the getDouble of jdbc > > return > > >>>>> 0.0. > > >>>>> Is > > >>>>> it really the correct behaviour? Wouldn't be better to use a > > POJO > > >>>>> or > > >>>>> the > > >>>>> Row of datatable that can handle void? Moreover, the mapping > > >>>>> between > > >>>>> SQL > > >>>>> type and Java types varies much from the single JDBC > > >>>>> implementation. > > >>>>> Wouldn't be better to rely on the Java type coming from using > > >>>>> resultSet.getObject() to get such a mapping rather than using > > the > > >>>>> ResultSetMetadata types? > > >>>>> 2. I'd like to handle connections very efficiently because we > > >>>>> have a > > >>>>> use > > >>>>> case with billions of records and thus millions of splits and > > >>>>> establish > > >>>>> a > > >>>>> new connection each time could be expensive. Would it be a > > >>>>> problem to > > >>>>> add > > >>>>> apache pool dependency to the jdbc batch connector in order to > > >>>>> reuase > > >>>>> the > > >>>>> created connections? > > >>>>> > > >>>>> > > >>>>> [1] > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > >>>>> > > >>>>> > > >>>>> > > > > > > |
If we share the connection, then we should also be careful with the close()
implementation. I did not see changes for this method in the PR. saluti, Stefano 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Following your suggestions I've fixed the connection reuse in my PR at > https://github.com/apache/flink/pull/1885. > I simply check in the establishConnection() if dbConn!=null and, in that > case, I simply return immediately. > > Thus, the only remaining thin to fix is the null handling. Do you have any > suggestion about how to transform the results in a POJO? > Maybe returning a Row and then let the user manage the conversion to the > target POJO in a successive map could be a more general soloution? > > Best, > Flavio > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> wrote: > > > There is an InputFormat object for each parallel task of a DataSource. > > So for a source with parallelism 8 you will have 8 instances of the > > InputFormat running, regardless whether this is on one box with 8 slots > or > > 8 machines with 1 slots each. > > The same is true for all other operators (Map, Reduce, Join, etc.) and > > DataSinks. > > > > Note, a single task does not fill a slot, but a "slice" of the program > (one > > parallel task of each operator) fills a slot. > > > > Cheers, Fabian > > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > ok thanks!just one last question: an inputformat is instantiated for > each > > > task slot or once for task manger? > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote: > > > > > > > no. > > > > > > > > if (connection==null) { > > > > establishCOnnection(); > > > > } > > > > > > > > done. same connection for all splits. > > > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > > > >> I didn't understand what you mean for "it should also be possible to > > > reuse > > > >> the same connection of an InputFormat across InputSplits, i.e., > calls > > of > > > >> the open() method". > > > >> At the moment in the open method there's a call to > > establishConnection, > > > >> thus, a new connection is created for each split. > > > >> If I understood correctly, you're suggesting to create a pool in the > > > >> inputFormat and simply call poo.borrow() in the open() rather than > > > >> establishConnection? > > > >> > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> > wrote: > > > >> > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > >>> > > > >>> Hi Flavio, > > > >>>> > > > >>>> that are good questions. > > > >>>> > > > >>>> 1) Replacing null values by default values and simply forwarding > > > records > > > >>>> is > > > >>>> very dangerous, in my opinion. > > > >>>> I see two alternatives: A) we use a data type that tolerates null > > > >>>> values. > > > >>>> This could be a POJO that the user has to provide or Row. The > > drawback > > > >>>> of > > > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple > > and > > > >>>> add > > > >>>> an additional field that holds an Integer which serves as a bitset > > to > > > >>>> mark > > > >>>> null fields. This would be a pretty low level API though. I am > > leaning > > > >>>> towards the user-provided POJO option. > > > >>>> > > > >>>> i would also lean towards the POJO option. > > > >>> > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > > think > > > we > > > >>>> can add a dependency to that module. However, it should also be > > > possible > > > >>>> to > > > >>>> reuse the same connection of an InputFormat across InputSplits, > > i.e., > > > >>>> calls > > > >>>> of the open() method. Wouldn't that be sufficient? > > > >>>> > > > >>>> this is the right approach imo. > > > >>> > > > >>> Best, Fabian > > > >>>> > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > [hidden email] > > >: > > > >>>> > > > >>>> Hi guys, > > > >>>> > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > > couple > > > >>>>> of > > > >>>>> thing that I'd like to discuss with the core developers. > > > >>>>> > > > >>>>> > > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At > > the > > > >>>>> moment > > > >>>>> if I find a null value for a Double, the getDouble of jdbc > > > return > > > >>>>> 0.0. > > > >>>>> Is > > > >>>>> it really the correct behaviour? Wouldn't be better to use a > > > POJO > > > >>>>> or > > > >>>>> the > > > >>>>> Row of datatable that can handle void? Moreover, the mapping > > > >>>>> between > > > >>>>> SQL > > > >>>>> type and Java types varies much from the single JDBC > > > >>>>> implementation. > > > >>>>> Wouldn't be better to rely on the Java type coming from > using > > > >>>>> resultSet.getObject() to get such a mapping rather than > using > > > the > > > >>>>> ResultSetMetadata types? > > > >>>>> 2. I'd like to handle connections very efficiently because > we > > > >>>>> have a > > > >>>>> use > > > >>>>> case with billions of records and thus millions of splits > and > > > >>>>> establish > > > >>>>> a > > > >>>>> new connection each time could be expensive. Would it be a > > > >>>>> problem to > > > >>>>> add > > > >>>>> apache pool dependency to the jdbc batch connector in order > to > > > >>>>> reuase > > > >>>>> the > > > >>>>> created connections? > > > >>>>> > > > >>>>> > > > >>>>> [1] > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > >>>>> > > > >>>>> > > > >>>>> > > > > > > > > > > |
Talking with Stefano this morning and looking at the DataSourceTask code we
discovered that the open() and close() methods are both called for every split and not once per inputFormat instance (maybe open and close should be renamed as openSplit and closeSplit to avoid confusion...). I think that it could worth to add 2 methods to the InputFormat (e.g. openInputFormat() and closeInputFormat() ) to allow for the managment of the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and thus adding a dependency) to avoid the creation of a new connection (expensive operation) for every split (that in our use case happens millions of times). What about the output of the inputFormat? how do you want me to proceed? With POJO or Row? If POJO, which strategy do you suggest? Best, Flavio On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]> wrote: > If we share the connection, then we should also be careful with the close() > implementation. I did not see changes for this method in the PR. > > saluti, > Stefano > > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > Following your suggestions I've fixed the connection reuse in my PR at > > https://github.com/apache/flink/pull/1885. > > I simply check in the establishConnection() if dbConn!=null and, in that > > case, I simply return immediately. > > > > Thus, the only remaining thin to fix is the null handling. Do you have > any > > suggestion about how to transform the results in a POJO? > > Maybe returning a Row and then let the user manage the conversion to the > > target POJO in a successive map could be a more general soloution? > > > > Best, > > Flavio > > > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> > wrote: > > > > > There is an InputFormat object for each parallel task of a DataSource. > > > So for a source with parallelism 8 you will have 8 instances of the > > > InputFormat running, regardless whether this is on one box with 8 slots > > or > > > 8 machines with 1 slots each. > > > The same is true for all other operators (Map, Reduce, Join, etc.) and > > > DataSinks. > > > > > > Note, a single task does not fill a slot, but a "slice" of the program > > (one > > > parallel task of each operator) fills a slot. > > > > > > Cheers, Fabian > > > > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > > > ok thanks!just one last question: an inputformat is instantiated for > > each > > > > task slot or once for task manger? > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> wrote: > > > > > > > > > no. > > > > > > > > > > if (connection==null) { > > > > > establishCOnnection(); > > > > > } > > > > > > > > > > done. same connection for all splits. > > > > > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > > > > > >> I didn't understand what you mean for "it should also be possible > to > > > > reuse > > > > >> the same connection of an InputFormat across InputSplits, i.e., > > calls > > > of > > > > >> the open() method". > > > > >> At the moment in the open method there's a call to > > > establishConnection, > > > > >> thus, a new connection is created for each split. > > > > >> If I understood correctly, you're suggesting to create a pool in > the > > > > >> inputFormat and simply call poo.borrow() in the open() rather than > > > > >> establishConnection? > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> > > wrote: > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > >>> > > > > >>> Hi Flavio, > > > > >>>> > > > > >>>> that are good questions. > > > > >>>> > > > > >>>> 1) Replacing null values by default values and simply forwarding > > > > records > > > > >>>> is > > > > >>>> very dangerous, in my opinion. > > > > >>>> I see two alternatives: A) we use a data type that tolerates > null > > > > >>>> values. > > > > >>>> This could be a POJO that the user has to provide or Row. The > > > drawback > > > > >>>> of > > > > >>>> Row is that it is untyped and not easy to handle. B) We use > Tuple > > > and > > > > >>>> add > > > > >>>> an additional field that holds an Integer which serves as a > bitset > > > to > > > > >>>> mark > > > > >>>> null fields. This would be a pretty low level API though. I am > > > leaning > > > > >>>> towards the user-provided POJO option. > > > > >>>> > > > > >>>> i would also lean towards the POJO option. > > > > >>> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > > > think > > > > we > > > > >>>> can add a dependency to that module. However, it should also be > > > > possible > > > > >>>> to > > > > >>>> reuse the same connection of an InputFormat across InputSplits, > > > i.e., > > > > >>>> calls > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > >>>> > > > > >>>> this is the right approach imo. > > > > >>> > > > > >>> Best, Fabian > > > > >>>> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > [hidden email] > > > >: > > > > >>>> > > > > >>>> Hi guys, > > > > >>>> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > > > couple > > > > >>>>> of > > > > >>>>> thing that I'd like to discuss with the core developers. > > > > >>>>> > > > > >>>>> > > > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: > At > > > the > > > > >>>>> moment > > > > >>>>> if I find a null value for a Double, the getDouble of > jdbc > > > > return > > > > >>>>> 0.0. > > > > >>>>> Is > > > > >>>>> it really the correct behaviour? Wouldn't be better to > use a > > > > POJO > > > > >>>>> or > > > > >>>>> the > > > > >>>>> Row of datatable that can handle void? Moreover, the > mapping > > > > >>>>> between > > > > >>>>> SQL > > > > >>>>> type and Java types varies much from the single JDBC > > > > >>>>> implementation. > > > > >>>>> Wouldn't be better to rely on the Java type coming from > > using > > > > >>>>> resultSet.getObject() to get such a mapping rather than > > using > > > > the > > > > >>>>> ResultSetMetadata types? > > > > >>>>> 2. I'd like to handle connections very efficiently because > > we > > > > >>>>> have a > > > > >>>>> use > > > > >>>>> case with billions of records and thus millions of splits > > and > > > > >>>>> establish > > > > >>>>> a > > > > >>>>> new connection each time could be expensive. Would it be a > > > > >>>>> problem to > > > > >>>>> add > > > > >>>>> apache pool dependency to the jdbc batch connector in > order > > to > > > > >>>>> reuase > > > > >>>>> the > > > > >>>>> created connections? > > > > >>>>> > > > > >>>>> > > > > >>>>> [1] > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > > > > > > > > > > > > |
Being a generic JDBC input format, I would prefer to stay with Row, letting
the developer manage the cast according to the driver functionalities. As for the open() and close() issue, I agree with Flavio that we'd need a better management of the inputformat lifecycle. Perhaps a new interface extending it: RichInputFormat? my2c. Stefano 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Talking with Stefano this morning and looking at the DataSourceTask code we > discovered that the open() and close() methods are both called for every > split and not once per inputFormat instance (maybe open and close should be > renamed as openSplit and closeSplit to avoid confusion...). > I think that it could worth to add 2 methods to the InputFormat (e.g. > openInputFormat() and closeInputFormat() ) to allow for the managment of > the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and > thus adding a dependency) to avoid the creation of a new connection > (expensive operation) for every split (that in our use case happens > millions of times). > > What about the output of the inputFormat? how do you want me to proceed? > With POJO or Row? If POJO, which strategy do you suggest? > > Best, > Flavio > > On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]> > wrote: > > > If we share the connection, then we should also be careful with the > close() > > implementation. I did not see changes for this method in the PR. > > > > saluti, > > Stefano > > > > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > Following your suggestions I've fixed the connection reuse in my PR at > > > https://github.com/apache/flink/pull/1885. > > > I simply check in the establishConnection() if dbConn!=null and, in > that > > > case, I simply return immediately. > > > > > > Thus, the only remaining thin to fix is the null handling. Do you have > > any > > > suggestion about how to transform the results in a POJO? > > > Maybe returning a Row and then let the user manage the conversion to > the > > > target POJO in a successive map could be a more general soloution? > > > > > > Best, > > > Flavio > > > > > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> > > wrote: > > > > > > > There is an InputFormat object for each parallel task of a > DataSource. > > > > So for a source with parallelism 8 you will have 8 instances of the > > > > InputFormat running, regardless whether this is on one box with 8 > slots > > > or > > > > 8 machines with 1 slots each. > > > > The same is true for all other operators (Map, Reduce, Join, etc.) > and > > > > DataSinks. > > > > > > > > Note, a single task does not fill a slot, but a "slice" of the > program > > > (one > > > > parallel task of each operator) fills a slot. > > > > > > > > Cheers, Fabian > > > > > > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email] > >: > > > > > > > > > ok thanks!just one last question: an inputformat is instantiated > for > > > each > > > > > task slot or once for task manger? > > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> > wrote: > > > > > > > > > > > no. > > > > > > > > > > > > if (connection==null) { > > > > > > establishCOnnection(); > > > > > > } > > > > > > > > > > > > done. same connection for all splits. > > > > > > > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > > > > > > > >> I didn't understand what you mean for "it should also be > possible > > to > > > > > reuse > > > > > >> the same connection of an InputFormat across InputSplits, i.e., > > > calls > > > > of > > > > > >> the open() method". > > > > > >> At the moment in the open method there's a call to > > > > establishConnection, > > > > > >> thus, a new connection is created for each split. > > > > > >> If I understood correctly, you're suggesting to create a pool in > > the > > > > > >> inputFormat and simply call poo.borrow() in the open() rather > than > > > > > >> establishConnection? > > > > > >> > > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> > > > wrote: > > > > > >> > > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > > >>> > > > > > >>> Hi Flavio, > > > > > >>>> > > > > > >>>> that are good questions. > > > > > >>>> > > > > > >>>> 1) Replacing null values by default values and simply > forwarding > > > > > records > > > > > >>>> is > > > > > >>>> very dangerous, in my opinion. > > > > > >>>> I see two alternatives: A) we use a data type that tolerates > > null > > > > > >>>> values. > > > > > >>>> This could be a POJO that the user has to provide or Row. The > > > > drawback > > > > > >>>> of > > > > > >>>> Row is that it is untyped and not easy to handle. B) We use > > Tuple > > > > and > > > > > >>>> add > > > > > >>>> an additional field that holds an Integer which serves as a > > bitset > > > > to > > > > > >>>> mark > > > > > >>>> null fields. This would be a pretty low level API though. I am > > > > leaning > > > > > >>>> towards the user-provided POJO option. > > > > > >>>> > > > > > >>>> i would also lean towards the POJO option. > > > > > >>> > > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. > I > > > > think > > > > > we > > > > > >>>> can add a dependency to that module. However, it should also > be > > > > > possible > > > > > >>>> to > > > > > >>>> reuse the same connection of an InputFormat across > InputSplits, > > > > i.e., > > > > > >>>> calls > > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > > >>>> > > > > > >>>> this is the right approach imo. > > > > > >>> > > > > > >>> Best, Fabian > > > > > >>>> > > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > [hidden email] > > > > >: > > > > > >>>> > > > > > >>>> Hi guys, > > > > > >>>> > > > > > >>>>> I'm integrating the comments of Chesnay to my PR but there's > a > > > > couple > > > > > >>>>> of > > > > > >>>>> thing that I'd like to discuss with the core developers. > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> 1. about the JDBC type mapping (addValue() method at > [1]: > > At > > > > the > > > > > >>>>> moment > > > > > >>>>> if I find a null value for a Double, the getDouble of > > jdbc > > > > > return > > > > > >>>>> 0.0. > > > > > >>>>> Is > > > > > >>>>> it really the correct behaviour? Wouldn't be better to > > use a > > > > > POJO > > > > > >>>>> or > > > > > >>>>> the > > > > > >>>>> Row of datatable that can handle void? Moreover, the > > mapping > > > > > >>>>> between > > > > > >>>>> SQL > > > > > >>>>> type and Java types varies much from the single JDBC > > > > > >>>>> implementation. > > > > > >>>>> Wouldn't be better to rely on the Java type coming from > > > using > > > > > >>>>> resultSet.getObject() to get such a mapping rather than > > > using > > > > > the > > > > > >>>>> ResultSetMetadata types? > > > > > >>>>> 2. I'd like to handle connections very efficiently > because > > > we > > > > > >>>>> have a > > > > > >>>>> use > > > > > >>>>> case with billions of records and thus millions of > splits > > > and > > > > > >>>>> establish > > > > > >>>>> a > > > > > >>>>> new connection each time could be expensive. Would it > be a > > > > > >>>>> problem to > > > > > >>>>> add > > > > > >>>>> apache pool dependency to the jdbc batch connector in > > order > > > to > > > > > >>>>> reuase > > > > > >>>>> the > > > > > >>>>> created connections? > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> [1] > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > > > > > > > > > > > > > > > > > |
Of course there is one already. We'll look into the runtime context.
saluti, Stefano 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > Being a generic JDBC input format, I would prefer to stay with Row, > letting the developer manage the cast according to the driver > functionalities. > > As for the open() and close() issue, I agree with Flavio that we'd need a > better management of the inputformat lifecycle. Perhaps a new interface > extending it: RichInputFormat? > > my2c. > > Stefano > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>: > >> Talking with Stefano this morning and looking at the DataSourceTask code >> we >> discovered that the open() and close() methods are both called for every >> split and not once per inputFormat instance (maybe open and close should >> be >> renamed as openSplit and closeSplit to avoid confusion...). >> I think that it could worth to add 2 methods to the InputFormat (e.g. >> openInputFormat() and closeInputFormat() ) to allow for the managment of >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and >> thus adding a dependency) to avoid the creation of a new connection >> (expensive operation) for every split (that in our use case happens >> millions of times). >> >> What about the output of the inputFormat? how do you want me to proceed? >> With POJO or Row? If POJO, which strategy do you suggest? >> >> Best, >> Flavio >> >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]> >> wrote: >> >> > If we share the connection, then we should also be careful with the >> close() >> > implementation. I did not see changes for this method in the PR. >> > >> > saluti, >> > Stefano >> > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>: >> > >> > > Following your suggestions I've fixed the connection reuse in my PR at >> > > https://github.com/apache/flink/pull/1885. >> > > I simply check in the establishConnection() if dbConn!=null and, in >> that >> > > case, I simply return immediately. >> > > >> > > Thus, the only remaining thin to fix is the null handling. Do you have >> > any >> > > suggestion about how to transform the results in a POJO? >> > > Maybe returning a Row and then let the user manage the conversion to >> the >> > > target POJO in a successive map could be a more general soloution? >> > > >> > > Best, >> > > Flavio >> > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> >> > wrote: >> > > >> > > > There is an InputFormat object for each parallel task of a >> DataSource. >> > > > So for a source with parallelism 8 you will have 8 instances of the >> > > > InputFormat running, regardless whether this is on one box with 8 >> slots >> > > or >> > > > 8 machines with 1 slots each. >> > > > The same is true for all other operators (Map, Reduce, Join, etc.) >> and >> > > > DataSinks. >> > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the >> program >> > > (one >> > > > parallel task of each operator) fills a slot. >> > > > >> > > > Cheers, Fabian >> > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <[hidden email] >> >: >> > > > >> > > > > ok thanks!just one last question: an inputformat is instantiated >> for >> > > each >> > > > > task slot or once for task manger? >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> >> wrote: >> > > > > >> > > > > > no. >> > > > > > >> > > > > > if (connection==null) { >> > > > > > establishCOnnection(); >> > > > > > } >> > > > > > >> > > > > > done. same connection for all splits. >> > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: >> > > > > > >> > > > > >> I didn't understand what you mean for "it should also be >> possible >> > to >> > > > > reuse >> > > > > >> the same connection of an InputFormat across InputSplits, i.e., >> > > calls >> > > > of >> > > > > >> the open() method". >> > > > > >> At the moment in the open method there's a call to >> > > > establishConnection, >> > > > > >> thus, a new connection is created for each split. >> > > > > >> If I understood correctly, you're suggesting to create a pool >> in >> > the >> > > > > >> inputFormat and simply call poo.borrow() in the open() rather >> than >> > > > > >> establishConnection? >> > > > > >> >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email]> >> > > wrote: >> > > > > >> >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: >> > > > > >>> >> > > > > >>> Hi Flavio, >> > > > > >>>> >> > > > > >>>> that are good questions. >> > > > > >>>> >> > > > > >>>> 1) Replacing null values by default values and simply >> forwarding >> > > > > records >> > > > > >>>> is >> > > > > >>>> very dangerous, in my opinion. >> > > > > >>>> I see two alternatives: A) we use a data type that tolerates >> > null >> > > > > >>>> values. >> > > > > >>>> This could be a POJO that the user has to provide or Row. The >> > > > drawback >> > > > > >>>> of >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use >> > Tuple >> > > > and >> > > > > >>>> add >> > > > > >>>> an additional field that holds an Integer which serves as a >> > bitset >> > > > to >> > > > > >>>> mark >> > > > > >>>> null fields. This would be a pretty low level API though. I >> am >> > > > leaning >> > > > > >>>> towards the user-provided POJO option. >> > > > > >>>> >> > > > > >>>> i would also lean towards the POJO option. >> > > > > >>> >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven >> module. I >> > > > think >> > > > > we >> > > > > >>>> can add a dependency to that module. However, it should also >> be >> > > > > possible >> > > > > >>>> to >> > > > > >>>> reuse the same connection of an InputFormat across >> InputSplits, >> > > > i.e., >> > > > > >>>> calls >> > > > > >>>> of the open() method. Wouldn't that be sufficient? >> > > > > >>>> >> > > > > >>>> this is the right approach imo. >> > > > > >>> >> > > > > >>> Best, Fabian >> > > > > >>>> >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < >> > > [hidden email] >> > > > >: >> > > > > >>>> >> > > > > >>>> Hi guys, >> > > > > >>>> >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but >> there's a >> > > > couple >> > > > > >>>>> of >> > > > > >>>>> thing that I'd like to discuss with the core developers. >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method at >> [1]: >> > At >> > > > the >> > > > > >>>>> moment >> > > > > >>>>> if I find a null value for a Double, the getDouble of >> > jdbc >> > > > > return >> > > > > >>>>> 0.0. >> > > > > >>>>> Is >> > > > > >>>>> it really the correct behaviour? Wouldn't be better to >> > use a >> > > > > POJO >> > > > > >>>>> or >> > > > > >>>>> the >> > > > > >>>>> Row of datatable that can handle void? Moreover, the >> > mapping >> > > > > >>>>> between >> > > > > >>>>> SQL >> > > > > >>>>> type and Java types varies much from the single JDBC >> > > > > >>>>> implementation. >> > > > > >>>>> Wouldn't be better to rely on the Java type coming from >> > > using >> > > > > >>>>> resultSet.getObject() to get such a mapping rather than >> > > using >> > > > > the >> > > > > >>>>> ResultSetMetadata types? >> > > > > >>>>> 2. I'd like to handle connections very efficiently >> because >> > > we >> > > > > >>>>> have a >> > > > > >>>>> use >> > > > > >>>>> case with billions of records and thus millions of >> splits >> > > and >> > > > > >>>>> establish >> > > > > >>>>> a >> > > > > >>>>> new connection each time could be expensive. Would it >> be a >> > > > > >>>>> problem to >> > > > > >>>>> add >> > > > > >>>>> apache pool dependency to the jdbc batch connector in >> > order >> > > to >> > > > > >>>>> reuase >> > > > > >>>>> the >> > > > > >>>>> created connections? >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> [1] >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > >> > > > >> > > >> > >> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> > > |
There is also InputFormat.configure() which is called before any split
processing happens. But I see your point about a missing close() method that is called after all input splits have been processed. On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> wrote: > Of course there is one already. We'll look into the runtime context. > > saluti, > Stefano > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > > > Being a generic JDBC input format, I would prefer to stay with Row, > > letting the developer manage the cast according to the driver > > functionalities. > > > > As for the open() and close() issue, I agree with Flavio that we'd need a > > better management of the inputformat lifecycle. Perhaps a new interface > > extending it: RichInputFormat? > > > > my2c. > > > > Stefano > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > >> Talking with Stefano this morning and looking at the DataSourceTask code > >> we > >> discovered that the open() and close() methods are both called for every > >> split and not once per inputFormat instance (maybe open and close should > >> be > >> renamed as openSplit and closeSplit to avoid confusion...). > >> I think that it could worth to add 2 methods to the InputFormat (e.g. > >> openInputFormat() and closeInputFormat() ) to allow for the managment of > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool > (and > >> thus adding a dependency) to avoid the creation of a new connection > >> (expensive operation) for every split (that in our use case happens > >> millions of times). > >> > >> What about the output of the inputFormat? how do you want me to proceed? > >> With POJO or Row? If POJO, which strategy do you suggest? > >> > >> Best, > >> Flavio > >> > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email]> > >> wrote: > >> > >> > If we share the connection, then we should also be careful with the > >> close() > >> > implementation. I did not see changes for this method in the PR. > >> > > >> > saluti, > >> > Stefano > >> > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email]>: > >> > > >> > > Following your suggestions I've fixed the connection reuse in my PR > at > >> > > https://github.com/apache/flink/pull/1885. > >> > > I simply check in the establishConnection() if dbConn!=null and, in > >> that > >> > > case, I simply return immediately. > >> > > > >> > > Thus, the only remaining thin to fix is the null handling. Do you > have > >> > any > >> > > suggestion about how to transform the results in a POJO? > >> > > Maybe returning a Row and then let the user manage the conversion to > >> the > >> > > target POJO in a successive map could be a more general soloution? > >> > > > >> > > Best, > >> > > Flavio > >> > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email]> > >> > wrote: > >> > > > >> > > > There is an InputFormat object for each parallel task of a > >> DataSource. > >> > > > So for a source with parallelism 8 you will have 8 instances of > the > >> > > > InputFormat running, regardless whether this is on one box with 8 > >> slots > >> > > or > >> > > > 8 machines with 1 slots each. > >> > > > The same is true for all other operators (Map, Reduce, Join, etc.) > >> and > >> > > > DataSinks. > >> > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the > >> program > >> > > (one > >> > > > parallel task of each operator) fills a slot. > >> > > > > >> > > > Cheers, Fabian > >> > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > [hidden email] > >> >: > >> > > > > >> > > > > ok thanks!just one last question: an inputformat is instantiated > >> for > >> > > each > >> > > > > task slot or once for task manger? > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> > >> wrote: > >> > > > > > >> > > > > > no. > >> > > > > > > >> > > > > > if (connection==null) { > >> > > > > > establishCOnnection(); > >> > > > > > } > >> > > > > > > >> > > > > > done. same connection for all splits. > >> > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > >> > > > > > > >> > > > > >> I didn't understand what you mean for "it should also be > >> possible > >> > to > >> > > > > reuse > >> > > > > >> the same connection of an InputFormat across InputSplits, > i.e., > >> > > calls > >> > > > of > >> > > > > >> the open() method". > >> > > > > >> At the moment in the open method there's a call to > >> > > > establishConnection, > >> > > > > >> thus, a new connection is created for each split. > >> > > > > >> If I understood correctly, you're suggesting to create a pool > >> in > >> > the > >> > > > > >> inputFormat and simply call poo.borrow() in the open() rather > >> than > >> > > > > >> establishConnection? > >> > > > > >> > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <[hidden email] > > > >> > > wrote: > >> > > > > >> > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > >> > > > > >>> > >> > > > > >>> Hi Flavio, > >> > > > > >>>> > >> > > > > >>>> that are good questions. > >> > > > > >>>> > >> > > > > >>>> 1) Replacing null values by default values and simply > >> forwarding > >> > > > > records > >> > > > > >>>> is > >> > > > > >>>> very dangerous, in my opinion. > >> > > > > >>>> I see two alternatives: A) we use a data type that > tolerates > >> > null > >> > > > > >>>> values. > >> > > > > >>>> This could be a POJO that the user has to provide or Row. > The > >> > > > drawback > >> > > > > >>>> of > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use > >> > Tuple > >> > > > and > >> > > > > >>>> add > >> > > > > >>>> an additional field that holds an Integer which serves as a > >> > bitset > >> > > > to > >> > > > > >>>> mark > >> > > > > >>>> null fields. This would be a pretty low level API though. I > >> am > >> > > > leaning > >> > > > > >>>> towards the user-provided POJO option. > >> > > > > >>>> > >> > > > > >>>> i would also lean towards the POJO option. > >> > > > > >>> > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > >> module. I > >> > > > think > >> > > > > we > >> > > > > >>>> can add a dependency to that module. However, it should > also > >> be > >> > > > > possible > >> > > > > >>>> to > >> > > > > >>>> reuse the same connection of an InputFormat across > >> InputSplits, > >> > > > i.e., > >> > > > > >>>> calls > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > >> > > > > >>>> > >> > > > > >>>> this is the right approach imo. > >> > > > > >>> > >> > > > > >>> Best, Fabian > >> > > > > >>>> > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > >> > > [hidden email] > >> > > > >: > >> > > > > >>>> > >> > > > > >>>> Hi guys, > >> > > > > >>>> > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > >> there's a > >> > > > couple > >> > > > > >>>>> of > >> > > > > >>>>> thing that I'd like to discuss with the core developers. > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method at > >> [1]: > >> > At > >> > > > the > >> > > > > >>>>> moment > >> > > > > >>>>> if I find a null value for a Double, the getDouble > of > >> > jdbc > >> > > > > return > >> > > > > >>>>> 0.0. > >> > > > > >>>>> Is > >> > > > > >>>>> it really the correct behaviour? Wouldn't be better > to > >> > use a > >> > > > > POJO > >> > > > > >>>>> or > >> > > > > >>>>> the > >> > > > > >>>>> Row of datatable that can handle void? Moreover, the > >> > mapping > >> > > > > >>>>> between > >> > > > > >>>>> SQL > >> > > > > >>>>> type and Java types varies much from the single JDBC > >> > > > > >>>>> implementation. > >> > > > > >>>>> Wouldn't be better to rely on the Java type coming > from > >> > > using > >> > > > > >>>>> resultSet.getObject() to get such a mapping rather > than > >> > > using > >> > > > > the > >> > > > > >>>>> ResultSetMetadata types? > >> > > > > >>>>> 2. I'd like to handle connections very efficiently > >> because > >> > > we > >> > > > > >>>>> have a > >> > > > > >>>>> use > >> > > > > >>>>> case with billions of records and thus millions of > >> splits > >> > > and > >> > > > > >>>>> establish > >> > > > > >>>>> a > >> > > > > >>>>> new connection each time could be expensive. Would it > >> be a > >> > > > > >>>>> problem to > >> > > > > >>>>> add > >> > > > > >>>>> apache pool dependency to the jdbc batch connector in > >> > order > >> > > to > >> > > > > >>>>> reuase > >> > > > > >>>>> the > >> > > > > >>>>> created connections? > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > >>>>> [1] > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > >>>>> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > |
Yes, I forgot to mention that I could instantiate the connection in the
configure() but then I can't close it (as you confirmed) :( On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]> wrote: > There is also InputFormat.configure() which is called before any split > processing happens. But I see your point about a missing close() method > that is called after all input splits have been processed. > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> wrote: > > > Of course there is one already. We'll look into the runtime context. > > > > saluti, > > Stefano > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > > > > > Being a generic JDBC input format, I would prefer to stay with Row, > > > letting the developer manage the cast according to the driver > > > functionalities. > > > > > > As for the open() and close() issue, I agree with Flavio that we'd > need a > > > better management of the inputformat lifecycle. Perhaps a new interface > > > extending it: RichInputFormat? > > > > > > my2c. > > > > > > Stefano > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > >> Talking with Stefano this morning and looking at the DataSourceTask > code > > >> we > > >> discovered that the open() and close() methods are both called for > every > > >> split and not once per inputFormat instance (maybe open and close > should > > >> be > > >> renamed as openSplit and closeSplit to avoid confusion...). > > >> I think that it could worth to add 2 methods to the InputFormat (e.g. > > >> openInputFormat() and closeInputFormat() ) to allow for the managment > of > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool > > (and > > >> thus adding a dependency) to avoid the creation of a new connection > > >> (expensive operation) for every split (that in our use case happens > > >> millions of times). > > >> > > >> What about the output of the inputFormat? how do you want me to > proceed? > > >> With POJO or Row? If POJO, which strategy do you suggest? > > >> > > >> Best, > > >> Flavio > > >> > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <[hidden email] > > > > >> wrote: > > >> > > >> > If we share the connection, then we should also be careful with the > > >> close() > > >> > implementation. I did not see changes for this method in the PR. > > >> > > > >> > saluti, > > >> > Stefano > > >> > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <[hidden email] > >: > > >> > > > >> > > Following your suggestions I've fixed the connection reuse in my > PR > > at > > >> > > https://github.com/apache/flink/pull/1885. > > >> > > I simply check in the establishConnection() if dbConn!=null and, > in > > >> that > > >> > > case, I simply return immediately. > > >> > > > > >> > > Thus, the only remaining thin to fix is the null handling. Do you > > have > > >> > any > > >> > > suggestion about how to transform the results in a POJO? > > >> > > Maybe returning a Row and then let the user manage the conversion > to > > >> the > > >> > > target POJO in a successive map could be a more general soloution? > > >> > > > > >> > > Best, > > >> > > Flavio > > >> > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <[hidden email] > > > > >> > wrote: > > >> > > > > >> > > > There is an InputFormat object for each parallel task of a > > >> DataSource. > > >> > > > So for a source with parallelism 8 you will have 8 instances of > > the > > >> > > > InputFormat running, regardless whether this is on one box with > 8 > > >> slots > > >> > > or > > >> > > > 8 machines with 1 slots each. > > >> > > > The same is true for all other operators (Map, Reduce, Join, > etc.) > > >> and > > >> > > > DataSinks. > > >> > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the > > >> program > > >> > > (one > > >> > > > parallel task of each operator) fills a slot. > > >> > > > > > >> > > > Cheers, Fabian > > >> > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > [hidden email] > > >> >: > > >> > > > > > >> > > > > ok thanks!just one last question: an inputformat is > instantiated > > >> for > > >> > > each > > >> > > > > task slot or once for task manger? > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <[hidden email]> > > >> wrote: > > >> > > > > > > >> > > > > > no. > > >> > > > > > > > >> > > > > > if (connection==null) { > > >> > > > > > establishCOnnection(); > > >> > > > > > } > > >> > > > > > > > >> > > > > > done. same connection for all splits. > > >> > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > >> > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also be > > >> possible > > >> > to > > >> > > > > reuse > > >> > > > > >> the same connection of an InputFormat across InputSplits, > > i.e., > > >> > > calls > > >> > > > of > > >> > > > > >> the open() method". > > >> > > > > >> At the moment in the open method there's a call to > > >> > > > establishConnection, > > >> > > > > >> thus, a new connection is created for each split. > > >> > > > > >> If I understood correctly, you're suggesting to create a > pool > > >> in > > >> > the > > >> > > > > >> inputFormat and simply call poo.borrow() in the open() > rather > > >> than > > >> > > > > >> establishConnection? > > >> > > > > >> > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > [hidden email] > > > > > >> > > wrote: > > >> > > > > >> > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > >> > > > > >>> > > >> > > > > >>> Hi Flavio, > > >> > > > > >>>> > > >> > > > > >>>> that are good questions. > > >> > > > > >>>> > > >> > > > > >>>> 1) Replacing null values by default values and simply > > >> forwarding > > >> > > > > records > > >> > > > > >>>> is > > >> > > > > >>>> very dangerous, in my opinion. > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > tolerates > > >> > null > > >> > > > > >>>> values. > > >> > > > > >>>> This could be a POJO that the user has to provide or Row. > > The > > >> > > > drawback > > >> > > > > >>>> of > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We > use > > >> > Tuple > > >> > > > and > > >> > > > > >>>> add > > >> > > > > >>>> an additional field that holds an Integer which serves > as a > > >> > bitset > > >> > > > to > > >> > > > > >>>> mark > > >> > > > > >>>> null fields. This would be a pretty low level API > though. I > > >> am > > >> > > > leaning > > >> > > > > >>>> towards the user-provided POJO option. > > >> > > > > >>>> > > >> > > > > >>>> i would also lean towards the POJO option. > > >> > > > > >>> > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > > >> module. I > > >> > > > think > > >> > > > > we > > >> > > > > >>>> can add a dependency to that module. However, it should > > also > > >> be > > >> > > > > possible > > >> > > > > >>>> to > > >> > > > > >>>> reuse the same connection of an InputFormat across > > >> InputSplits, > > >> > > > i.e., > > >> > > > > >>>> calls > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > >> > > > > >>>> > > >> > > > > >>>> this is the right approach imo. > > >> > > > > >>> > > >> > > > > >>> Best, Fabian > > >> > > > > >>>> > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > >> > > [hidden email] > > >> > > > >: > > >> > > > > >>>> > > >> > > > > >>>> Hi guys, > > >> > > > > >>>> > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > > >> there's a > > >> > > > couple > > >> > > > > >>>>> of > > >> > > > > >>>>> thing that I'd like to discuss with the core developers. > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method > at > > >> [1]: > > >> > At > > >> > > > the > > >> > > > > >>>>> moment > > >> > > > > >>>>> if I find a null value for a Double, the getDouble > > of > > >> > jdbc > > >> > > > > return > > >> > > > > >>>>> 0.0. > > >> > > > > >>>>> Is > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be better > > to > > >> > use a > > >> > > > > POJO > > >> > > > > >>>>> or > > >> > > > > >>>>> the > > >> > > > > >>>>> Row of datatable that can handle void? Moreover, > the > > >> > mapping > > >> > > > > >>>>> between > > >> > > > > >>>>> SQL > > >> > > > > >>>>> type and Java types varies much from the single > JDBC > > >> > > > > >>>>> implementation. > > >> > > > > >>>>> Wouldn't be better to rely on the Java type coming > > from > > >> > > using > > >> > > > > >>>>> resultSet.getObject() to get such a mapping rather > > than > > >> > > using > > >> > > > > the > > >> > > > > >>>>> ResultSetMetadata types? > > >> > > > > >>>>> 2. I'd like to handle connections very efficiently > > >> because > > >> > > we > > >> > > > > >>>>> have a > > >> > > > > >>>>> use > > >> > > > > >>>>> case with billions of records and thus millions of > > >> splits > > >> > > and > > >> > > > > >>>>> establish > > >> > > > > >>>>> a > > >> > > > > >>>>> new connection each time could be expensive. Would > it > > >> be a > > >> > > > > >>>>> problem to > > >> > > > > >>>>> add > > >> > > > > >>>>> apache pool dependency to the jdbc batch connector > in > > >> > order > > >> > > to > > >> > > > > >>>>> reuase > > >> > > > > >>>>> the > > >> > > > > >>>>> created connections? > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > >>>>> [1] > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > >>>>> > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > |
I agree, a method to close an input format is missing.
InputFormat is an API stable interface, so it is not possible to extend it (until Flink 2.0). RichInputFormat is API stable as well, but an abstract class. So it should be possible to add an empty default implementation of a closeInputFormat() method there. Of course it would be good to re-use connections across input splits. On the other hand, input splits should not be too fine-grained as well, because input split assignment has some overhead as well. Best, Fabian 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Yes, I forgot to mention that I could instantiate the connection in the > configure() but then I can't close it (as you confirmed) :( > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]> > wrote: > > > There is also InputFormat.configure() which is called before any split > > processing happens. But I see your point about a missing close() method > > that is called after all input splits have been processed. > > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> > wrote: > > > > > Of course there is one already. We'll look into the runtime context. > > > > > > saluti, > > > Stefano > > > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > > > > > > > Being a generic JDBC input format, I would prefer to stay with Row, > > > > letting the developer manage the cast according to the driver > > > > functionalities. > > > > > > > > As for the open() and close() issue, I agree with Flavio that we'd > > need a > > > > better management of the inputformat lifecycle. Perhaps a new > interface > > > > extending it: RichInputFormat? > > > > > > > > my2c. > > > > > > > > Stefano > > > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > > > >> Talking with Stefano this morning and looking at the DataSourceTask > > code > > > >> we > > > >> discovered that the open() and close() methods are both called for > > every > > > >> split and not once per inputFormat instance (maybe open and close > > should > > > >> be > > > >> renamed as openSplit and closeSplit to avoid confusion...). > > > >> I think that it could worth to add 2 methods to the InputFormat > (e.g. > > > >> openInputFormat() and closeInputFormat() ) to allow for the > managment > > of > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool > > > (and > > > >> thus adding a dependency) to avoid the creation of a new connection > > > >> (expensive operation) for every split (that in our use case happens > > > >> millions of times). > > > >> > > > >> What about the output of the inputFormat? how do you want me to > > proceed? > > > >> With POJO or Row? If POJO, which strategy do you suggest? > > > >> > > > >> Best, > > > >> Flavio > > > >> > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli < > [hidden email] > > > > > > >> wrote: > > > >> > > > >> > If we share the connection, then we should also be careful with > the > > > >> close() > > > >> > implementation. I did not see changes for this method in the PR. > > > >> > > > > >> > saluti, > > > >> > Stefano > > > >> > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier < > [hidden email] > > >: > > > >> > > > > >> > > Following your suggestions I've fixed the connection reuse in my > > PR > > > at > > > >> > > https://github.com/apache/flink/pull/1885. > > > >> > > I simply check in the establishConnection() if dbConn!=null and, > > in > > > >> that > > > >> > > case, I simply return immediately. > > > >> > > > > > >> > > Thus, the only remaining thin to fix is the null handling. Do > you > > > have > > > >> > any > > > >> > > suggestion about how to transform the results in a POJO? > > > >> > > Maybe returning a Row and then let the user manage the > conversion > > to > > > >> the > > > >> > > target POJO in a successive map could be a more general > soloution? > > > >> > > > > > >> > > Best, > > > >> > > Flavio > > > >> > > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske < > [hidden email] > > > > > > >> > wrote: > > > >> > > > > > >> > > > There is an InputFormat object for each parallel task of a > > > >> DataSource. > > > >> > > > So for a source with parallelism 8 you will have 8 instances > of > > > the > > > >> > > > InputFormat running, regardless whether this is on one box > with > > 8 > > > >> slots > > > >> > > or > > > >> > > > 8 machines with 1 slots each. > > > >> > > > The same is true for all other operators (Map, Reduce, Join, > > etc.) > > > >> and > > > >> > > > DataSinks. > > > >> > > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the > > > >> program > > > >> > > (one > > > >> > > > parallel task of each operator) fills a slot. > > > >> > > > > > > >> > > > Cheers, Fabian > > > >> > > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > > [hidden email] > > > >> >: > > > >> > > > > > > >> > > > > ok thanks!just one last question: an inputformat is > > instantiated > > > >> for > > > >> > > each > > > >> > > > > task slot or once for task manger? > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" < > [hidden email]> > > > >> wrote: > > > >> > > > > > > > >> > > > > > no. > > > >> > > > > > > > > >> > > > > > if (connection==null) { > > > >> > > > > > establishCOnnection(); > > > >> > > > > > } > > > >> > > > > > > > > >> > > > > > done. same connection for all splits. > > > >> > > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > >> > > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also be > > > >> possible > > > >> > to > > > >> > > > > reuse > > > >> > > > > >> the same connection of an InputFormat across InputSplits, > > > i.e., > > > >> > > calls > > > >> > > > of > > > >> > > > > >> the open() method". > > > >> > > > > >> At the moment in the open method there's a call to > > > >> > > > establishConnection, > > > >> > > > > >> thus, a new connection is created for each split. > > > >> > > > > >> If I understood correctly, you're suggesting to create a > > pool > > > >> in > > > >> > the > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open() > > rather > > > >> than > > > >> > > > > >> establishConnection? > > > >> > > > > >> > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > > [hidden email] > > > > > > > >> > > wrote: > > > >> > > > > >> > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > >> > > > > >>> > > > >> > > > > >>> Hi Flavio, > > > >> > > > > >>>> > > > >> > > > > >>>> that are good questions. > > > >> > > > > >>>> > > > >> > > > > >>>> 1) Replacing null values by default values and simply > > > >> forwarding > > > >> > > > > records > > > >> > > > > >>>> is > > > >> > > > > >>>> very dangerous, in my opinion. > > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > > tolerates > > > >> > null > > > >> > > > > >>>> values. > > > >> > > > > >>>> This could be a POJO that the user has to provide or > Row. > > > The > > > >> > > > drawback > > > >> > > > > >>>> of > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We > > use > > > >> > Tuple > > > >> > > > and > > > >> > > > > >>>> add > > > >> > > > > >>>> an additional field that holds an Integer which serves > > as a > > > >> > bitset > > > >> > > > to > > > >> > > > > >>>> mark > > > >> > > > > >>>> null fields. This would be a pretty low level API > > though. I > > > >> am > > > >> > > > leaning > > > >> > > > > >>>> towards the user-provided POJO option. > > > >> > > > > >>>> > > > >> > > > > >>>> i would also lean towards the POJO option. > > > >> > > > > >>> > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > > > >> module. I > > > >> > > > think > > > >> > > > > we > > > >> > > > > >>>> can add a dependency to that module. However, it should > > > also > > > >> be > > > >> > > > > possible > > > >> > > > > >>>> to > > > >> > > > > >>>> reuse the same connection of an InputFormat across > > > >> InputSplits, > > > >> > > > i.e., > > > >> > > > > >>>> calls > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > >> > > > > >>>> > > > >> > > > > >>>> this is the right approach imo. > > > >> > > > > >>> > > > >> > > > > >>> Best, Fabian > > > >> > > > > >>>> > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > >> > > [hidden email] > > > >> > > > >: > > > >> > > > > >>>> > > > >> > > > > >>>> Hi guys, > > > >> > > > > >>>> > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > > > >> there's a > > > >> > > > couple > > > >> > > > > >>>>> of > > > >> > > > > >>>>> thing that I'd like to discuss with the core > developers. > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method > > at > > > >> [1]: > > > >> > At > > > >> > > > the > > > >> > > > > >>>>> moment > > > >> > > > > >>>>> if I find a null value for a Double, the > getDouble > > > of > > > >> > jdbc > > > >> > > > > return > > > >> > > > > >>>>> 0.0. > > > >> > > > > >>>>> Is > > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be > better > > > to > > > >> > use a > > > >> > > > > POJO > > > >> > > > > >>>>> or > > > >> > > > > >>>>> the > > > >> > > > > >>>>> Row of datatable that can handle void? Moreover, > > the > > > >> > mapping > > > >> > > > > >>>>> between > > > >> > > > > >>>>> SQL > > > >> > > > > >>>>> type and Java types varies much from the single > > JDBC > > > >> > > > > >>>>> implementation. > > > >> > > > > >>>>> Wouldn't be better to rely on the Java type > coming > > > from > > > >> > > using > > > >> > > > > >>>>> resultSet.getObject() to get such a mapping > rather > > > than > > > >> > > using > > > >> > > > > the > > > >> > > > > >>>>> ResultSetMetadata types? > > > >> > > > > >>>>> 2. I'd like to handle connections very > efficiently > > > >> because > > > >> > > we > > > >> > > > > >>>>> have a > > > >> > > > > >>>>> use > > > >> > > > > >>>>> case with billions of records and thus millions > of > > > >> splits > > > >> > > and > > > >> > > > > >>>>> establish > > > >> > > > > >>>>> a > > > >> > > > > >>>>> new connection each time could be expensive. > Would > > it > > > >> be a > > > >> > > > > >>>>> problem to > > > >> > > > > >>>>> add > > > >> > > > > >>>>> apache pool dependency to the jdbc batch > connector > > in > > > >> > order > > > >> > > to > > > >> > > > > >>>>> reuase > > > >> > > > > >>>>> the > > > >> > > > > >>>>> created connections? > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> [1] > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > > |
Hi Fabian, I've just created a JIRA for that (FLINK-3777).
As you said input split should be not too fine-grained but we have a table with 11 billions of rows that can't be queried with ranges greated than 100K of rows because it has a lot of JOIN and increasing thhis threashold implies incredibly longer response time). This implies millions of splits and, thus, millions of calls to open and thus connectiosn re-creation.. :( On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <[hidden email]> wrote: > I agree, a method to close an input format is missing. > InputFormat is an API stable interface, so it is not possible to extend it > (until Flink 2.0). RichInputFormat is API stable as well, but an abstract > class. So it should be possible to add an empty default implementation of a > closeInputFormat() method there. > > Of course it would be good to re-use connections across input splits. > On the other hand, input splits should not be too fine-grained as well, > because input split assignment has some overhead as well. > > Best, Fabian > > 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > Yes, I forgot to mention that I could instantiate the connection in the > > configure() but then I can't close it (as you confirmed) :( > > > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email]> > > wrote: > > > > > There is also InputFormat.configure() which is called before any split > > > processing happens. But I see your point about a missing close() method > > > that is called after all input splits have been processed. > > > > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> > > wrote: > > > > > > > Of course there is one already. We'll look into the runtime context. > > > > > > > > saluti, > > > > Stefano > > > > > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > > > > > > > > > Being a generic JDBC input format, I would prefer to stay with Row, > > > > > letting the developer manage the cast according to the driver > > > > > functionalities. > > > > > > > > > > As for the open() and close() issue, I agree with Flavio that we'd > > > need a > > > > > better management of the inputformat lifecycle. Perhaps a new > > interface > > > > > extending it: RichInputFormat? > > > > > > > > > > my2c. > > > > > > > > > > Stefano > > > > > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <[hidden email] > >: > > > > > > > > > >> Talking with Stefano this morning and looking at the > DataSourceTask > > > code > > > > >> we > > > > >> discovered that the open() and close() methods are both called for > > > every > > > > >> split and not once per inputFormat instance (maybe open and close > > > should > > > > >> be > > > > >> renamed as openSplit and closeSplit to avoid confusion...). > > > > >> I think that it could worth to add 2 methods to the InputFormat > > (e.g. > > > > >> openInputFormat() and closeInputFormat() ) to allow for the > > managment > > > of > > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a > pool > > > > (and > > > > >> thus adding a dependency) to avoid the creation of a new > connection > > > > >> (expensive operation) for every split (that in our use case > happens > > > > >> millions of times). > > > > >> > > > > >> What about the output of the inputFormat? how do you want me to > > > proceed? > > > > >> With POJO or Row? If POJO, which strategy do you suggest? > > > > >> > > > > >> Best, > > > > >> Flavio > > > > >> > > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli < > > [hidden email] > > > > > > > > >> wrote: > > > > >> > > > > >> > If we share the connection, then we should also be careful with > > the > > > > >> close() > > > > >> > implementation. I did not see changes for this method in the PR. > > > > >> > > > > > >> > saluti, > > > > >> > Stefano > > > > >> > > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier < > > [hidden email] > > > >: > > > > >> > > > > > >> > > Following your suggestions I've fixed the connection reuse in > my > > > PR > > > > at > > > > >> > > https://github.com/apache/flink/pull/1885. > > > > >> > > I simply check in the establishConnection() if dbConn!=null > and, > > > in > > > > >> that > > > > >> > > case, I simply return immediately. > > > > >> > > > > > > >> > > Thus, the only remaining thin to fix is the null handling. Do > > you > > > > have > > > > >> > any > > > > >> > > suggestion about how to transform the results in a POJO? > > > > >> > > Maybe returning a Row and then let the user manage the > > conversion > > > to > > > > >> the > > > > >> > > target POJO in a successive map could be a more general > > soloution? > > > > >> > > > > > > >> > > Best, > > > > >> > > Flavio > > > > >> > > > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske < > > [hidden email] > > > > > > > > >> > wrote: > > > > >> > > > > > > >> > > > There is an InputFormat object for each parallel task of a > > > > >> DataSource. > > > > >> > > > So for a source with parallelism 8 you will have 8 instances > > of > > > > the > > > > >> > > > InputFormat running, regardless whether this is on one box > > with > > > 8 > > > > >> slots > > > > >> > > or > > > > >> > > > 8 machines with 1 slots each. > > > > >> > > > The same is true for all other operators (Map, Reduce, Join, > > > etc.) > > > > >> and > > > > >> > > > DataSinks. > > > > >> > > > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of > the > > > > >> program > > > > >> > > (one > > > > >> > > > parallel task of each operator) fills a slot. > > > > >> > > > > > > > >> > > > Cheers, Fabian > > > > >> > > > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > > > [hidden email] > > > > >> >: > > > > >> > > > > > > > >> > > > > ok thanks!just one last question: an inputformat is > > > instantiated > > > > >> for > > > > >> > > each > > > > >> > > > > task slot or once for task manger? > > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" < > > [hidden email]> > > > > >> wrote: > > > > >> > > > > > > > > >> > > > > > no. > > > > >> > > > > > > > > > >> > > > > > if (connection==null) { > > > > >> > > > > > establishCOnnection(); > > > > >> > > > > > } > > > > >> > > > > > > > > > >> > > > > > done. same connection for all splits. > > > > >> > > > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > >> > > > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also > be > > > > >> possible > > > > >> > to > > > > >> > > > > reuse > > > > >> > > > > >> the same connection of an InputFormat across > InputSplits, > > > > i.e., > > > > >> > > calls > > > > >> > > > of > > > > >> > > > > >> the open() method". > > > > >> > > > > >> At the moment in the open method there's a call to > > > > >> > > > establishConnection, > > > > >> > > > > >> thus, a new connection is created for each split. > > > > >> > > > > >> If I understood correctly, you're suggesting to create > a > > > pool > > > > >> in > > > > >> > the > > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open() > > > rather > > > > >> than > > > > >> > > > > >> establishConnection? > > > > >> > > > > >> > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > > > [hidden email] > > > > > > > > > >> > > wrote: > > > > >> > > > > >> > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > >> > > > > >>> > > > > >> > > > > >>> Hi Flavio, > > > > >> > > > > >>>> > > > > >> > > > > >>>> that are good questions. > > > > >> > > > > >>>> > > > > >> > > > > >>>> 1) Replacing null values by default values and simply > > > > >> forwarding > > > > >> > > > > records > > > > >> > > > > >>>> is > > > > >> > > > > >>>> very dangerous, in my opinion. > > > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > > > tolerates > > > > >> > null > > > > >> > > > > >>>> values. > > > > >> > > > > >>>> This could be a POJO that the user has to provide or > > Row. > > > > The > > > > >> > > > drawback > > > > >> > > > > >>>> of > > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) > We > > > use > > > > >> > Tuple > > > > >> > > > and > > > > >> > > > > >>>> add > > > > >> > > > > >>>> an additional field that holds an Integer which > serves > > > as a > > > > >> > bitset > > > > >> > > > to > > > > >> > > > > >>>> mark > > > > >> > > > > >>>> null fields. This would be a pretty low level API > > > though. I > > > > >> am > > > > >> > > > leaning > > > > >> > > > > >>>> towards the user-provided POJO option. > > > > >> > > > > >>>> > > > > >> > > > > >>>> i would also lean towards the POJO option. > > > > >> > > > > >>> > > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > > > > >> module. I > > > > >> > > > think > > > > >> > > > > we > > > > >> > > > > >>>> can add a dependency to that module. However, it > should > > > > also > > > > >> be > > > > >> > > > > possible > > > > >> > > > > >>>> to > > > > >> > > > > >>>> reuse the same connection of an InputFormat across > > > > >> InputSplits, > > > > >> > > > i.e., > > > > >> > > > > >>>> calls > > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > >> > > > > >>>> > > > > >> > > > > >>>> this is the right approach imo. > > > > >> > > > > >>> > > > > >> > > > > >>> Best, Fabian > > > > >> > > > > >>>> > > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > > >> > > [hidden email] > > > > >> > > > >: > > > > >> > > > > >>>> > > > > >> > > > > >>>> Hi guys, > > > > >> > > > > >>>> > > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > > > > >> there's a > > > > >> > > > couple > > > > >> > > > > >>>>> of > > > > >> > > > > >>>>> thing that I'd like to discuss with the core > > developers. > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() > method > > > at > > > > >> [1]: > > > > >> > At > > > > >> > > > the > > > > >> > > > > >>>>> moment > > > > >> > > > > >>>>> if I find a null value for a Double, the > > getDouble > > > > of > > > > >> > jdbc > > > > >> > > > > return > > > > >> > > > > >>>>> 0.0. > > > > >> > > > > >>>>> Is > > > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be > > better > > > > to > > > > >> > use a > > > > >> > > > > POJO > > > > >> > > > > >>>>> or > > > > >> > > > > >>>>> the > > > > >> > > > > >>>>> Row of datatable that can handle void? > Moreover, > > > the > > > > >> > mapping > > > > >> > > > > >>>>> between > > > > >> > > > > >>>>> SQL > > > > >> > > > > >>>>> type and Java types varies much from the single > > > JDBC > > > > >> > > > > >>>>> implementation. > > > > >> > > > > >>>>> Wouldn't be better to rely on the Java type > > coming > > > > from > > > > >> > > using > > > > >> > > > > >>>>> resultSet.getObject() to get such a mapping > > rather > > > > than > > > > >> > > using > > > > >> > > > > the > > > > >> > > > > >>>>> ResultSetMetadata types? > > > > >> > > > > >>>>> 2. I'd like to handle connections very > > efficiently > > > > >> because > > > > >> > > we > > > > >> > > > > >>>>> have a > > > > >> > > > > >>>>> use > > > > >> > > > > >>>>> case with billions of records and thus millions > > of > > > > >> splits > > > > >> > > and > > > > >> > > > > >>>>> establish > > > > >> > > > > >>>>> a > > > > >> > > > > >>>>> new connection each time could be expensive. > > Would > > > it > > > > >> be a > > > > >> > > > > >>>>> problem to > > > > >> > > > > >>>>> add > > > > >> > > > > >>>>> apache pool dependency to the jdbc batch > > connector > > > in > > > > >> > order > > > > >> > > to > > > > >> > > > > >>>>> reuase > > > > >> > > > > >>>>> the > > > > >> > > > > >>>>> created connections? > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> [1] > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > |
As Flavio underlined, it is not about selecting a certain number of rows,
but executing queries with sequence of joins on a very large database. We played around to find the best throughput. Honestly, I prefer to have many smaller range-queries with more parallel threads than fewer expensive queries. Of course this makes sense if we can reuse the connection. Flavio created a PR that should provide a good starting point to dealing with this types of connections. thanks a lot for the great support! saluti, Stefano 2016-04-18 12:08 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Hi Fabian, I've just created a JIRA for that (FLINK-3777). > As you said input split should be not too fine-grained but we have a table > with 11 billions of rows that can't be queried with ranges greated than > 100K of rows because it has a lot of JOIN and increasing thhis threashold > implies incredibly longer response time). This implies millions of splits > and, thus, millions of calls to open and thus connectiosn re-creation.. :( > > > On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <[hidden email]> wrote: > > > I agree, a method to close an input format is missing. > > InputFormat is an API stable interface, so it is not possible to extend > it > > (until Flink 2.0). RichInputFormat is API stable as well, but an abstract > > class. So it should be possible to add an empty default implementation > of a > > closeInputFormat() method there. > > > > Of course it would be good to re-use connections across input splits. > > On the other hand, input splits should not be too fine-grained as well, > > because input split assignment has some overhead as well. > > > > Best, Fabian > > > > 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > Yes, I forgot to mention that I could instantiate the connection in the > > > configure() but then I can't close it (as you confirmed) :( > > > > > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <[hidden email] > > > > > wrote: > > > > > > > There is also InputFormat.configure() which is called before any > split > > > > processing happens. But I see your point about a missing close() > method > > > > that is called after all input splits have been processed. > > > > > > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <[hidden email]> > > > wrote: > > > > > > > > > Of course there is one already. We'll look into the runtime > context. > > > > > > > > > > saluti, > > > > > Stefano > > > > > > > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <[hidden email]>: > > > > > > > > > > > Being a generic JDBC input format, I would prefer to stay with > Row, > > > > > > letting the developer manage the cast according to the driver > > > > > > functionalities. > > > > > > > > > > > > As for the open() and close() issue, I agree with Flavio that > we'd > > > > need a > > > > > > better management of the inputformat lifecycle. Perhaps a new > > > interface > > > > > > extending it: RichInputFormat? > > > > > > > > > > > > my2c. > > > > > > > > > > > > Stefano > > > > > > > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier < > [hidden email] > > >: > > > > > > > > > > > >> Talking with Stefano this morning and looking at the > > DataSourceTask > > > > code > > > > > >> we > > > > > >> discovered that the open() and close() methods are both called > for > > > > every > > > > > >> split and not once per inputFormat instance (maybe open and > close > > > > should > > > > > >> be > > > > > >> renamed as openSplit and closeSplit to avoid confusion...). > > > > > >> I think that it could worth to add 2 methods to the InputFormat > > > (e.g. > > > > > >> openInputFormat() and closeInputFormat() ) to allow for the > > > managment > > > > of > > > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a > > pool > > > > > (and > > > > > >> thus adding a dependency) to avoid the creation of a new > > connection > > > > > >> (expensive operation) for every split (that in our use case > > happens > > > > > >> millions of times). > > > > > >> > > > > > >> What about the output of the inputFormat? how do you want me to > > > > proceed? > > > > > >> With POJO or Row? If POJO, which strategy do you suggest? > > > > > >> > > > > > >> Best, > > > > > >> Flavio > > > > > >> > > > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli < > > > [hidden email] > > > > > > > > > > >> wrote: > > > > > >> > > > > > >> > If we share the connection, then we should also be careful > with > > > the > > > > > >> close() > > > > > >> > implementation. I did not see changes for this method in the > PR. > > > > > >> > > > > > > >> > saluti, > > > > > >> > Stefano > > > > > >> > > > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier < > > > [hidden email] > > > > >: > > > > > >> > > > > > > >> > > Following your suggestions I've fixed the connection reuse > in > > my > > > > PR > > > > > at > > > > > >> > > https://github.com/apache/flink/pull/1885. > > > > > >> > > I simply check in the establishConnection() if dbConn!=null > > and, > > > > in > > > > > >> that > > > > > >> > > case, I simply return immediately. > > > > > >> > > > > > > > >> > > Thus, the only remaining thin to fix is the null handling. > Do > > > you > > > > > have > > > > > >> > any > > > > > >> > > suggestion about how to transform the results in a POJO? > > > > > >> > > Maybe returning a Row and then let the user manage the > > > conversion > > > > to > > > > > >> the > > > > > >> > > target POJO in a successive map could be a more general > > > soloution? > > > > > >> > > > > > > > >> > > Best, > > > > > >> > > Flavio > > > > > >> > > > > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske < > > > [hidden email] > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > > >> > > > There is an InputFormat object for each parallel task of a > > > > > >> DataSource. > > > > > >> > > > So for a source with parallelism 8 you will have 8 > instances > > > of > > > > > the > > > > > >> > > > InputFormat running, regardless whether this is on one box > > > with > > > > 8 > > > > > >> slots > > > > > >> > > or > > > > > >> > > > 8 machines with 1 slots each. > > > > > >> > > > The same is true for all other operators (Map, Reduce, > Join, > > > > etc.) > > > > > >> and > > > > > >> > > > DataSinks. > > > > > >> > > > > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of > > the > > > > > >> program > > > > > >> > > (one > > > > > >> > > > parallel task of each operator) fills a slot. > > > > > >> > > > > > > > > >> > > > Cheers, Fabian > > > > > >> > > > > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > > > > [hidden email] > > > > > >> >: > > > > > >> > > > > > > > > >> > > > > ok thanks!just one last question: an inputformat is > > > > instantiated > > > > > >> for > > > > > >> > > each > > > > > >> > > > > task slot or once for task manger? > > > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" < > > > [hidden email]> > > > > > >> wrote: > > > > > >> > > > > > > > > > >> > > > > > no. > > > > > >> > > > > > > > > > > >> > > > > > if (connection==null) { > > > > > >> > > > > > establishCOnnection(); > > > > > >> > > > > > } > > > > > >> > > > > > > > > > > >> > > > > > done. same connection for all splits. > > > > > >> > > > > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > >> > > > > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also > > be > > > > > >> possible > > > > > >> > to > > > > > >> > > > > reuse > > > > > >> > > > > >> the same connection of an InputFormat across > > InputSplits, > > > > > i.e., > > > > > >> > > calls > > > > > >> > > > of > > > > > >> > > > > >> the open() method". > > > > > >> > > > > >> At the moment in the open method there's a call to > > > > > >> > > > establishConnection, > > > > > >> > > > > >> thus, a new connection is created for each split. > > > > > >> > > > > >> If I understood correctly, you're suggesting to > create > > a > > > > pool > > > > > >> in > > > > > >> > the > > > > > >> > > > > >> inputFormat and simply call poo.borrow() in the > open() > > > > rather > > > > > >> than > > > > > >> > > > > >> establishConnection? > > > > > >> > > > > >> > > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > > > > [hidden email] > > > > > > > > > > > >> > > wrote: > > > > > >> > > > > >> > > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > > >> > > > > >>> > > > > > >> > > > > >>> Hi Flavio, > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> that are good questions. > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> 1) Replacing null values by default values and > simply > > > > > >> forwarding > > > > > >> > > > > records > > > > > >> > > > > >>>> is > > > > > >> > > > > >>>> very dangerous, in my opinion. > > > > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > > > > tolerates > > > > > >> > null > > > > > >> > > > > >>>> values. > > > > > >> > > > > >>>> This could be a POJO that the user has to provide > or > > > Row. > > > > > The > > > > > >> > > > drawback > > > > > >> > > > > >>>> of > > > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. > B) > > We > > > > use > > > > > >> > Tuple > > > > > >> > > > and > > > > > >> > > > > >>>> add > > > > > >> > > > > >>>> an additional field that holds an Integer which > > serves > > > > as a > > > > > >> > bitset > > > > > >> > > > to > > > > > >> > > > > >>>> mark > > > > > >> > > > > >>>> null fields. This would be a pretty low level API > > > > though. I > > > > > >> am > > > > > >> > > > leaning > > > > > >> > > > > >>>> towards the user-provided POJO option. > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> i would also lean towards the POJO option. > > > > > >> > > > > >>> > > > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated > Maven > > > > > >> module. I > > > > > >> > > > think > > > > > >> > > > > we > > > > > >> > > > > >>>> can add a dependency to that module. However, it > > should > > > > > also > > > > > >> be > > > > > >> > > > > possible > > > > > >> > > > > >>>> to > > > > > >> > > > > >>>> reuse the same connection of an InputFormat across > > > > > >> InputSplits, > > > > > >> > > > i.e., > > > > > >> > > > > >>>> calls > > > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> this is the right approach imo. > > > > > >> > > > > >>> > > > > > >> > > > > >>> Best, Fabian > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > > > >> > > [hidden email] > > > > > >> > > > >: > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> Hi guys, > > > > > >> > > > > >>>> > > > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR > but > > > > > >> there's a > > > > > >> > > > couple > > > > > >> > > > > >>>>> of > > > > > >> > > > > >>>>> thing that I'd like to discuss with the core > > > developers. > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() > > method > > > > at > > > > > >> [1]: > > > > > >> > At > > > > > >> > > > the > > > > > >> > > > > >>>>> moment > > > > > >> > > > > >>>>> if I find a null value for a Double, the > > > getDouble > > > > > of > > > > > >> > jdbc > > > > > >> > > > > return > > > > > >> > > > > >>>>> 0.0. > > > > > >> > > > > >>>>> Is > > > > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be > > > better > > > > > to > > > > > >> > use a > > > > > >> > > > > POJO > > > > > >> > > > > >>>>> or > > > > > >> > > > > >>>>> the > > > > > >> > > > > >>>>> Row of datatable that can handle void? > > Moreover, > > > > the > > > > > >> > mapping > > > > > >> > > > > >>>>> between > > > > > >> > > > > >>>>> SQL > > > > > >> > > > > >>>>> type and Java types varies much from the > single > > > > JDBC > > > > > >> > > > > >>>>> implementation. > > > > > >> > > > > >>>>> Wouldn't be better to rely on the Java type > > > coming > > > > > from > > > > > >> > > using > > > > > >> > > > > >>>>> resultSet.getObject() to get such a mapping > > > rather > > > > > than > > > > > >> > > using > > > > > >> > > > > the > > > > > >> > > > > >>>>> ResultSetMetadata types? > > > > > >> > > > > >>>>> 2. I'd like to handle connections very > > > efficiently > > > > > >> because > > > > > >> > > we > > > > > >> > > > > >>>>> have a > > > > > >> > > > > >>>>> use > > > > > >> > > > > >>>>> case with billions of records and thus > millions > > > of > > > > > >> splits > > > > > >> > > and > > > > > >> > > > > >>>>> establish > > > > > >> > > > > >>>>> a > > > > > >> > > > > >>>>> new connection each time could be expensive. > > > Would > > > > it > > > > > >> be a > > > > > >> > > > > >>>>> problem to > > > > > >> > > > > >>>>> add > > > > > >> > > > > >>>>> apache pool dependency to the jdbc batch > > > connector > > > > in > > > > > >> > order > > > > > >> > > to > > > > > >> > > > > >>>>> reuase > > > > > >> > > > > >>>>> the > > > > > >> > > > > >>>>> created connections? > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> [1] > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |