Urgent help on S3 CSV file reader DataStream Job

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Urgent help on S3 CSV file reader DataStream Job

DEEP NARAYAN Singh
Hi  Guys,

Below is my code snippet , which read all csv files under the given folder
row by row but my requirement is to read csv file at a time and  convert as
json which will looks like :
{"A":"1","B":"3","C":"4","D":9}

Csv file data format   :
-------------------------------
*field_id,data,*



*A,1B,3C,4D,9*

Code snippet:
--------------------------













*final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();String path =
"s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
RowCsvInputFormat(            new Path(path),
fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
-1);lines.map(value -> value).print();*


Any help is highly appreciated.

Thanks,
-Deep
Reply | Threaded
Open this post in threaded view
|

Re: Urgent help on S3 CSV file reader DataStream Job

Wei Zhong-2
Hi Deep,

(redirecting this to user mailing list as this is not a dev question)

You can try to set the line delimiter and field delimiter of the RowCsvInputFormat to a non-printing character (assume there is no non-printing characters in the csv files). It will read all the content of a csv file into one Row. e.g.

final StreamExecutionEnvironment env =
   StreamExecutionEnvironment.getExecutionEnvironment();
String path = "test";
TypeInformation[] fieldTypes = new TypeInformation[]{
   BasicTypeInfo.STRING_TYPE_INFO};
RowCsvInputFormat csvFormat =
   new RowCsvInputFormat(new Path(path), fieldTypes);
csvFormat.setNestedFileEnumeration(true);
csvFormat.setDelimiter((char) 0);
csvFormat.setFieldDelimiter(String.valueOf((char) 0));
DataStream<Row>
   lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
   -1);lines.map(value -> value).print();
env.execute();

Then you can convert the content of the csv files to json manually.

Best,
Wei


> 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道:
>
> Hi  Guys,
>
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
>
> Csv file data format   :
> -------------------------------
> *field_id,data,*
>
>
>
> *A,1B,3C,4D,9*
>
> Code snippet:
> --------------------------
>
>
>
>
>
>
>
>
>
>
>
>
>
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> RowCsvInputFormat(            new Path(path),
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
>
>
> Any help is highly appreciated.
>
> Thanks,
> -Deep

Reply | Threaded
Open this post in threaded view
|

Re: Urgent help on S3 CSV file reader DataStream Job

Till Rohrmann
Hi Deep,

Could you use the TextInputFormat which reads a file line by line? That way
you can do the JSON parsing as part of a mapper which consumes the file
lines.

Cheers,
Till

On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote:

> Hi Deep,
>
> (redirecting this to user mailing list as this is not a dev question)
>
> You can try to set the line delimiter and field delimiter of the
> RowCsvInputFormat to a non-printing character (assume there is no non-printing
> characters in the csv files). It will read all the content of a csv file
> into one Row. e.g.
>
> final StreamExecutionEnvironment env =
>    StreamExecutionEnvironment.getExecutionEnvironment();
> String path = "test";
> TypeInformation[] fieldTypes = new TypeInformation[]{
>    BasicTypeInfo.STRING_TYPE_INFO};
> RowCsvInputFormat csvFormat =
>    new RowCsvInputFormat(new Path(path), fieldTypes);
> csvFormat.setNestedFileEnumeration(true);
> csvFormat.setDelimiter((char) 0);
> csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> DataStream<Row>
>    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
>    -1);lines.map(value -> value).print();
> env.execute();
>
>
> Then you can convert the content of the csv files to json manually.
>
> Best,
> Wei
>
>
> 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道:
>
> Hi  Guys,
>
> Below is my code snippet , which read all csv files under the given folder
> row by row but my requirement is to read csv file at a time and  convert as
> json which will looks like :
> {"A":"1","B":"3","C":"4","D":9}
>
> Csv file data format   :
> -------------------------------
> *field_id,data,*
>
>
>
> *A,1B,3C,4D,9*
>
> Code snippet:
> --------------------------
>
>
>
>
>
>
>
>
>
>
>
>
>
> *final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();String path =
> "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
>  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> RowCsvInputFormat(            new Path(path),
>
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> -1);lines.map(value -> value).print();*
>
>
> Any help is highly appreciated.
>
> Thanks,
> -Deep
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Urgent help on S3 CSV file reader DataStream Job

DEEP NARAYAN Singh
Hi Wei and Till,
Thanks for the quick reply.

*@Wei,* I tried with code which you have suggested and it is working fine
but I have one use case where it is failing, below is the csv input data
format :
Csv file data format   :
-------------------------------
*field_id,data,*



*A,1B,3C,4D,9*
*E,0,0,0,0*

because of last row which contains more that two value, and its is
throwing *org.apache.flink.api.common.io.ParseException:
Row too short: field_id,data,*

How to handle the above corner case.Could you please suggest some way to
handle this.

*@Till,* Could you please elaborate more which you are suggesting? As per
my use case I am dealing with multiple csv files under the given folder and
reading line by line using TextInputFormat  and transform will not work by
using map operator. Correct me if i'm wrong .

Thanks & Regards,
-Deep


On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <[hidden email]> wrote:

> Hi Deep,
>
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
>
> Cheers,
> Till
>
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote:
>
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no
> non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >    StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >    BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >    new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream<Row>
> >    lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >    -1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given
> folder
> > row by row but my requirement is to read csv file at a time and  convert
> as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > -------------------------------
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{      BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =      new
> > RowCsvInputFormat(            new Path(path),
> >
> >
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row>
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >
>