Representing Scala base types in the Flink RT

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Representing Scala base types in the Flink RT

aalexandrov
Hi there,

I cannot figure out how the Scala base types (e.g. scala.Int, scala.Double,
etc.) are mapped to the Flink runtime.

It seems that there are not treated the same as their Java counterparts
(e.g. java.lang.Integer, java.lang.Double). For example, if I write the
following code:

val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int,
String]](inPath, "\n", '\t', classOf[Int], classOf[Int],
classOf[String])
val typeInformation = new
fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]](
  BasicTypeInfo.INT_TYPE_INFO,
  BasicTypeInfo.INT_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO)
env.createInput(inputFormat, typeInformation)

I get the following error:

Exception in thread "main" java.lang.IllegalArgumentException: The type
'int' is not supported for the CSV input format.
    at
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
    at
org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
    at
org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
    ...

I couldn't really also find pre-defined TypeInformation implementations for
these basic Scala types (similar to what we have in BasicTypeInfo) or macro
code that synthesized those.

Can somebody elaborate on that?

Thanks,
A.
Reply | Threaded
Open this post in threaded view
|

Re: Representing Scala base types in the Flink RT

aalexandrov
Just to clarify in order to spare us some time in the discussion. I
*deliberately* want to use Flink Java API from Scala with Scala core types.

2015-01-20 18:53 GMT+01:00 Alexander Alexandrov <
[hidden email]>:

> Hi there,
>
> I cannot figure out how the Scala base types (e.g. scala.Int,
> scala.Double, etc.) are mapped to the Flink runtime.
>
> It seems that there are not treated the same as their Java counterparts
> (e.g. java.lang.Integer, java.lang.Double). For example, if I write the
> following code:
>
> val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String])
> val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, String]](
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.STRING_TYPE_INFO)
> env.createInput(inputFormat, typeInformation)
>
> I get the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: The type
> 'int' is not supported for the CSV input format.
>     at
> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
>     at
> org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
>     at
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
>     ...
>
> I couldn't really also find pre-defined TypeInformation implementations
> for these basic Scala types (similar to what we have in BasicTypeInfo) or
> macro code that synthesized those.
>
> Can somebody elaborate on that?
>
> Thanks,
> A.
>
Reply | Threaded
Open this post in threaded view
|

Re: Representing Scala base types in the Flink RT

Stephan Ewen
Hi!

The Basic types hold the Java primitives and their boxed versions. Since
the "scala.Int" boils down to the Java int primitive type at runtime, they
should be interoperable.

I would guess that the problem is that "classOf[Int]" actually returns (in
Java terms) "scala.Int.class", which is not recognized by the primitive
types. We could add those, I assume, to make it work.

The known basic types are statically set up in the class "BasicTypeInfo".
You can have a look and try if adding the Scala core types solves the
problem.

The only issue is that you would have to add the scala library as a
depencendy to "flink-java", or register the Scala types via
reflection/class name.

Greetings,
Stephan


On Tue, Jan 20, 2015 at 10:08 AM, Alexander Alexandrov <
[hidden email]> wrote:

> Just to clarify in order to spare us some time in the discussion. I
> *deliberately* want to use Flink Java API from Scala with Scala core types.
>
> 2015-01-20 18:53 GMT+01:00 Alexander Alexandrov <
> [hidden email]>:
>
> > Hi there,
> >
> > I cannot figure out how the Scala base types (e.g. scala.Int,
> > scala.Double, etc.) are mapped to the Flink runtime.
> >
> > It seems that there are not treated the same as their Java counterparts
> > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the
> > following code:
> >
> > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int,
> String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String])
> > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int,
> Int, String]](
> >   BasicTypeInfo.INT_TYPE_INFO,
> >   BasicTypeInfo.INT_TYPE_INFO,
> >   BasicTypeInfo.STRING_TYPE_INFO)
> > env.createInput(inputFormat, typeInformation)
> >
> > I get the following error:
> >
> > Exception in thread "main" java.lang.IllegalArgumentException: The type
> > 'int' is not supported for the CSV input format.
> >     at
> >
> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
> >     at
> >
> org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
> >     at
> >
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
> >     ...
> >
> > I couldn't really also find pre-defined TypeInformation implementations
> > for these basic Scala types (similar to what we have in BasicTypeInfo) or
> > macro code that synthesized those.
> >
> > Can somebody elaborate on that?
> >
> > Thanks,
> > A.
> >
>