[jira] [Created] (FLINK-6039) Row of TableFunction should support flexible number of fields

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-6039) Row of TableFunction should support flexible number of fields

Shang Yuanchun (Jira)
Zhuoluo Yang created FLINK-6039:
-----------------------------------

             Summary: Row of TableFunction should support flexible number of fields
                 Key: FLINK-6039
                 URL: https://issues.apache.org/jira/browse/FLINK-6039
             Project: Flink
          Issue Type: Improvement
            Reporter: Zhuoluo Yang
            Assignee: Zhuoluo Yang


In actual world, especially while processing log with TableFunction. The format of log in actual world is flexible. Thus, the number of fields should not be fixed.

For examples, we should make the three following types of of TableFunction works.
{code}
// Test for incomplete row
class TableFunc4 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(3)
        row.setField(0, s)  // And we only set values for one column
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for incomplete row
class TableFunc5 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(1)  // ResultType is three columns, we have only one here
        row.setField(0, s)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO)
  }
}

// Test for overflow row
class TableFunc6 extends TableFunction[Row] {
  def eval(str: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach({ s =>
        val row = new Row(5)  // ResultType is two columns, we have five columns here
        row.setField(0, s)
        row.setField(1, s.length)
        row.setField(2, s.length)
        row.setField(3, s.length)
        row.setField(4, s.length)
        collect(row)
      })
    }
  }

  override def getResultType: TypeInformation[Row] = {
    new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                    BasicTypeInfo.INT_TYPE_INFO)
  }
}
{code}

Actually, the TableFunc4 and TableFunc6 has already worked correctly with current version. This issue will make TableFunc5 works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)