[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
GitHub user qmlmoon opened a pull request:

    https://github.com/apache/incubator-flink/pull/47

    [FLINK-933] Add primitive input format to read a sequence of primitives

   

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/qmlmoon/incubator-flink primitive

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/47.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #47
   
----
commit 2ed34fd072d46d26a1e62b522cec7cc21653970e
Author: mingliang <[hidden email]>
Date:   2014-06-25T16:54:36Z

    [FLINK-933] Add primitive input format to read a sequence of primitives

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/47#discussion_r14237434
 
    --- Diff: stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrimitiveInputFormat.java ---
    @@ -0,0 +1,71 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.api.java.io;
    +
    +import eu.stratosphere.api.common.io.DelimitedInputFormat;
    +import eu.stratosphere.core.fs.Path;
    +import eu.stratosphere.types.parser.FieldParser;
    +import eu.stratosphere.util.InstantiationUtil;
    +
    +/**
    + * An input format that reads single field primitive data from a given file. The difference between this and
    + * {@link eu.stratosphere.api.java.io.CsvInputFormat} is that it won't go through {@link eu.stratosphere.api.java.tuple.Tuple1}.
    + */
    +public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
    +
    + private Class<OT> primitiveClass;
    +
    + private static final byte CARRIAGE_RETURN = (byte) '\r';
    +
    + private static final byte NEW_LINE = (byte) '\n';
    +
    +
    + public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
    + super(filePath);
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
    + if (parserType == null) {
    + throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not supported for the primitive input format.");
    + }
    + this.primitiveClass = primitiveClass;
    + }
    +
    + public PrimitiveInputFormat(Path filePath, char delimiter, Class<OT> primitiveClass) {
    + super(filePath);
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
    + if (parserType == null) {
    + throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not supported for the primitive input format.");
    + }
    + this.primitiveClass = primitiveClass;
    + this.setDelimiter(delimiter);
    + }
    +
    +
    + @Override
    + public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
    + //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
    + if (this.getDelimiter() != null && this.getDelimiter().length == 1
    + && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1
    + && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
    + numBytes -= 1;
    + }
    +
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(this.primitiveClass);
    + @SuppressWarnings("unchecked")
    + FieldParser<OT> p = (FieldParser<OT>) InstantiationUtil.instantiate(parserType, FieldParser.class);
    + p.parseField(bytes, offset, numBytes + offset, (char) this.getDelimiter()[0], reuse);
    --- End diff --
   
    I think this is not very efficient since you are creating the field parser for every record.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
In reply to this post by zentol
Github user qmlmoon commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/47#discussion_r14237611
 
    --- Diff: stratosphere-java/src/main/java/eu/stratosphere/api/java/io/PrimitiveInputFormat.java ---
    @@ -0,0 +1,71 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.api.java.io;
    +
    +import eu.stratosphere.api.common.io.DelimitedInputFormat;
    +import eu.stratosphere.core.fs.Path;
    +import eu.stratosphere.types.parser.FieldParser;
    +import eu.stratosphere.util.InstantiationUtil;
    +
    +/**
    + * An input format that reads single field primitive data from a given file. The difference between this and
    + * {@link eu.stratosphere.api.java.io.CsvInputFormat} is that it won't go through {@link eu.stratosphere.api.java.tuple.Tuple1}.
    + */
    +public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
    +
    + private Class<OT> primitiveClass;
    +
    + private static final byte CARRIAGE_RETURN = (byte) '\r';
    +
    + private static final byte NEW_LINE = (byte) '\n';
    +
    +
    + public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
    + super(filePath);
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
    + if (parserType == null) {
    + throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not supported for the primitive input format.");
    + }
    + this.primitiveClass = primitiveClass;
    + }
    +
    + public PrimitiveInputFormat(Path filePath, char delimiter, Class<OT> primitiveClass) {
    + super(filePath);
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(primitiveClass);
    + if (parserType == null) {
    + throw new IllegalArgumentException("The type '" + primitiveClass.getName() + "' is not supported for the primitive input format.");
    + }
    + this.primitiveClass = primitiveClass;
    + this.setDelimiter(delimiter);
    + }
    +
    +
    + @Override
    + public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) {
    + //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
    + if (this.getDelimiter() != null && this.getDelimiter().length == 1
    + && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1
    + && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
    + numBytes -= 1;
    + }
    +
    + Class<? extends FieldParser<OT>> parserType = FieldParser.getParserForType(this.primitiveClass);
    + @SuppressWarnings("unchecked")
    + FieldParser<OT> p = (FieldParser<OT>) InstantiationUtil.instantiate(parserType, FieldParser.class);
    + p.parseField(bytes, offset, numBytes + offset, (char) this.getDelimiter()[0], reuse);
    --- End diff --
   
    thanks, good point! I think it should in open method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
In reply to this post by zentol
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/47#discussion_r14289113
 
    --- Diff: stratosphere-java/src/test/java/eu/stratosphere/api/java/io/PrimitiveInputFormatTest.java ---
    @@ -0,0 +1,170 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.api.java.io;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +
    +import org.apache.log4j.Level;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import eu.stratosphere.configuration.Configuration;
    +import eu.stratosphere.core.fs.FileInputSplit;
    +import eu.stratosphere.core.fs.Path;
    +import eu.stratosphere.util.LogUtils;
    +
    +public class PrimitiveInputFormatTest {
    +
    + private static final Path PATH = new Path("an/ignored/file/");
    +
    + @BeforeClass
    + public static void initialize() {
    + LogUtils.initializeDefaultConsoleLogger(Level.WARN);
    + }
    +
    + @Test
    + public void testStringInput() {
    + try {
    + final String fileContent = "abc|def||";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, '|', String.class);
    +
    + final Configuration parameters = new Configuration();
    + format.configure(parameters);
    + format.open(split);
    +
    + String result = null;
    +
    + result = format.nextRecord(result);
    + assertEquals("abc", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("def", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("", result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + ex.printStackTrace();
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    +
    +
    + @Test
    + public void testIntegerInput() throws IOException {
    + try {
    + final String fileContent = "111|222|";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,'|', Integer.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Integer result = null;
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(111), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(222), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + @Test
    + public void testDoubleInputLinewise() throws IOException {
    + try {
    + final String fileContent = "1.21\n2.23\n";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Double> format = new PrimitiveInputFormat<Double>(PATH, Double.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Double result = null;
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(1.21), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(2.23), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + private FileInputSplit createTempFile(String content) throws IOException {
    --- End diff --
   
    I found it confusing that the method is called createTempFile, but returns FileInputSplit.
   
    This is up to you (aka matter of taste), but I find it clearer if the tests are after each other and the helper methods come at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
In reply to this post by zentol
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/47#discussion_r14289124
 
    --- Diff: stratosphere-java/src/test/java/eu/stratosphere/api/java/io/PrimitiveInputFormatTest.java ---
    @@ -0,0 +1,170 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.api.java.io;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +
    +import org.apache.log4j.Level;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import eu.stratosphere.configuration.Configuration;
    +import eu.stratosphere.core.fs.FileInputSplit;
    +import eu.stratosphere.core.fs.Path;
    +import eu.stratosphere.util.LogUtils;
    +
    +public class PrimitiveInputFormatTest {
    +
    + private static final Path PATH = new Path("an/ignored/file/");
    +
    + @BeforeClass
    + public static void initialize() {
    + LogUtils.initializeDefaultConsoleLogger(Level.WARN);
    + }
    +
    + @Test
    + public void testStringInput() {
    + try {
    + final String fileContent = "abc|def||";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, '|', String.class);
    +
    + final Configuration parameters = new Configuration();
    + format.configure(parameters);
    + format.open(split);
    +
    + String result = null;
    +
    + result = format.nextRecord(result);
    + assertEquals("abc", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("def", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("", result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + ex.printStackTrace();
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    +
    +
    + @Test
    + public void testIntegerInput() throws IOException {
    + try {
    + final String fileContent = "111|222|";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,'|', Integer.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Integer result = null;
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(111), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(222), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + @Test
    + public void testDoubleInputLinewise() throws IOException {
    + try {
    + final String fileContent = "1.21\n2.23\n";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Double> format = new PrimitiveInputFormat<Double>(PATH, Double.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Double result = null;
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(1.21), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(2.23), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + private FileInputSplit createTempFile(String content) throws IOException {
    + File tempFile = File.createTempFile("test_contents", "tmp");
    + tempFile.deleteOnExit();
    +
    + FileWriter wrt = new FileWriter(tempFile);
    + wrt.write(content);
    + wrt.close();
    +
    + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
    + }
    +
    + @Test
    + public void testRemovingTrailingCR() {
    + try {
    + String first = "First line";
    + String second = "Second line";
    + String fileContent = first + "\r\n" + second + "\r\n";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH ,String.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + String result = null;
    +
    + result = format.nextRecord(result);
    + assertEquals(first, result);
    +
    + result = format.nextRecord(result);
    + assertEquals(second, result);
    +
    --- End diff --
   
    Unnecessary empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: [FLINK-933] Add primitive input form...

zentol
In reply to this post by zentol
Github user qmlmoon commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/47#discussion_r14293149
 
    --- Diff: stratosphere-java/src/test/java/eu/stratosphere/api/java/io/PrimitiveInputFormatTest.java ---
    @@ -0,0 +1,170 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.api.java.io;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +import java.io.File;
    +import java.io.FileWriter;
    +import java.io.IOException;
    +
    +import org.apache.log4j.Level;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import eu.stratosphere.configuration.Configuration;
    +import eu.stratosphere.core.fs.FileInputSplit;
    +import eu.stratosphere.core.fs.Path;
    +import eu.stratosphere.util.LogUtils;
    +
    +public class PrimitiveInputFormatTest {
    +
    + private static final Path PATH = new Path("an/ignored/file/");
    +
    + @BeforeClass
    + public static void initialize() {
    + LogUtils.initializeDefaultConsoleLogger(Level.WARN);
    + }
    +
    + @Test
    + public void testStringInput() {
    + try {
    + final String fileContent = "abc|def||";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, '|', String.class);
    +
    + final Configuration parameters = new Configuration();
    + format.configure(parameters);
    + format.open(split);
    +
    + String result = null;
    +
    + result = format.nextRecord(result);
    + assertEquals("abc", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("def", result);
    +
    + result = format.nextRecord(result);
    + assertEquals("", result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + ex.printStackTrace();
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    +
    +
    + @Test
    + public void testIntegerInput() throws IOException {
    + try {
    + final String fileContent = "111|222|";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,'|', Integer.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Integer result = null;
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(111), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Integer.valueOf(222), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + @Test
    + public void testDoubleInputLinewise() throws IOException {
    + try {
    + final String fileContent = "1.21\n2.23\n";
    + final FileInputSplit split = createTempFile(fileContent);
    +
    + final PrimitiveInputFormat<Double> format = new PrimitiveInputFormat<Double>(PATH, Double.class);
    +
    + format.configure(new Configuration());
    + format.open(split);
    +
    + Double result = null;
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(1.21), result);
    +
    + result = format.nextRecord(result);
    + assertEquals(Double.valueOf(2.23), result);
    +
    + result = format.nextRecord(result);
    + assertNull(result);
    + assertTrue(format.reachedEnd());
    + }
    + catch (Exception ex) {
    + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
    + }
    + }
    +
    + private FileInputSplit createTempFile(String content) throws IOException {
    --- End diff --
   
    Yes, you are right. I also updated this in CsvInputFormatTest in the latest commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---