[jira] [Created] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19244) CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.

Shang Yuanchun (Jira)
Ying Z created FLINK-19244:
------------------------------

             Summary: CsvRowDataDeserializationSchema throws cast exception : Row length mismatch.
                 Key: FLINK-19244
                 URL: https://issues.apache.org/jira/browse/FLINK-19244
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.11.0
            Reporter: Ying Z


CREATE TABLE csv_table (
 f0 ROW<f0c0 VARCHAR, f0c1 VARCHAR>,
 f1 ROW<f1c0 INT, f1c1 VARCHAR>
 )

If f0 is null and f1c0=123, f1c1=456, the serialized data will be: ,123;456
When deserialize the data, the jsonNode of f0 would be [], then throws cast exception: Row length mismatch. 2 fields expected but was 0.

In the real scene, I set two streams:
 First, read json_table, sink to csv_table, which has the schema above.
 Then, read csv_table, do sth.

if json is \{"f0": null, "f1": {"f1c0": 123, "f1c1": 456}}, the second streams failed with the exception.

If this is a bug, I want to help to fix this and unittests.

 

here is the  test code:
{code:java}
// code placeholder
val subDataType0 = ROW(
  FIELD("f0c0", STRING()),
  FIELD("f0c1", STRING())
)
val subDataType1 = ROW(
  FIELD("f1c0", INT()),
  FIELD("f1c1", INT())
)
val datatype = ROW(
  FIELD("f0", subDataType0),
  FIELD("f1", subDataType1))
val rowType = datatype.getLogicalType.asInstanceOf[RowType]

val serSchema = new CsvRowDataSerializationSchema.Builder(rowType).build()
val deserSchema = new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType)).build()
def foo(r: RowData): Unit = {
  val serData = new String(serSchema.serialize(r))
  print(s"${serData}")
  val deserRow = deserSchema.deserialize(serData.getBytes)
  println(s"${deserRow}")
}

val normalRowData = GenericRowData.of(
  GenericRowData.of(BinaryStringData.fromString("hello"), BinaryStringData.fromString("world")),
  GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
)
// correct.
foo(normalRowData)

val nullRowData = GenericRowData.of(
  null,
  GenericRowData.of(Integer.valueOf(123), Integer.valueOf(456))
)
/*
Exception in thread "main" java.io.IOException: Failed to deserialize CSV row ',123;456
...
Caused by: java.lang.RuntimeException: Row length mismatch. 2 fields expected but was 0.
 */
foo(nullRowData)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)