Hi there,
I cannot figure out how the Scala base types (e.g. scala.Int, scala.Double, etc.) are mapped to the Flink runtime. It seems that there are not treated the same as their Java counterparts (e.g. java.lang.Integer, java.lang.Double). For example, if I write the following code: val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String]) val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]]( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) env.createInput(inputFormat, typeInformation) I get the following error: Exception in thread "main" java.lang.IllegalArgumentException: The type 'int' is not supported for the CSV input format. at org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) at org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) at org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) ... I couldn't really also find pre-defined TypeInformation implementations for these basic Scala types (similar to what we have in BasicTypeInfo) or macro code that synthesized those. Can somebody elaborate on that? Thanks, A. |
Just to clarify in order to spare us some time in the discussion. I
*deliberately* want to use Flink Java API from Scala with Scala core types. 2015-01-20 18:53 GMT+01:00 Alexander Alexandrov < [hidden email]>: > Hi there, > > I cannot figure out how the Scala base types (e.g. scala.Int, > scala.Double, etc.) are mapped to the Flink runtime. > > It seems that there are not treated the same as their Java counterparts > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the > following code: > > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String]) > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]]( > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO) > env.createInput(inputFormat, typeInformation) > > I get the following error: > > Exception in thread "main" java.lang.IllegalArgumentException: The type > 'int' is not supported for the CSV input format. > at > org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) > at > org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) > at > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) > ... > > I couldn't really also find pre-defined TypeInformation implementations > for these basic Scala types (similar to what we have in BasicTypeInfo) or > macro code that synthesized those. > > Can somebody elaborate on that? > > Thanks, > A. > |
Hi!
The Basic types hold the Java primitives and their boxed versions. Since the "scala.Int" boils down to the Java int primitive type at runtime, they should be interoperable. I would guess that the problem is that "classOf[Int]" actually returns (in Java terms) "scala.Int.class", which is not recognized by the primitive types. We could add those, I assume, to make it work. The known basic types are statically set up in the class "BasicTypeInfo". You can have a look and try if adding the Scala core types solves the problem. The only issue is that you would have to add the scala library as a depencendy to "flink-java", or register the Scala types via reflection/class name. Greetings, Stephan On Tue, Jan 20, 2015 at 10:08 AM, Alexander Alexandrov < [hidden email]> wrote: > Just to clarify in order to spare us some time in the discussion. I > *deliberately* want to use Flink Java API from Scala with Scala core types. > > 2015-01-20 18:53 GMT+01:00 Alexander Alexandrov < > [hidden email]>: > > > Hi there, > > > > I cannot figure out how the Scala base types (e.g. scala.Int, > > scala.Double, etc.) are mapped to the Flink runtime. > > > > It seems that there are not treated the same as their Java counterparts > > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the > > following code: > > > > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, > String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String]) > > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, > Int, String]]( > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.STRING_TYPE_INFO) > > env.createInput(inputFormat, typeInformation) > > > > I get the following error: > > > > Exception in thread "main" java.lang.IllegalArgumentException: The type > > 'int' is not supported for the CSV input format. > > at > > > org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) > > at > > > org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) > > at > > > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) > > ... > > > > I couldn't really also find pre-defined TypeInformation implementations > > for these basic Scala types (similar to what we have in BasicTypeInfo) or > > macro code that synthesized those. > > > > Can somebody elaborate on that? > > > > Thanks, > > A. > > > |
Free forum by Nabble | Edit this page |