Hi Guys,
Below is my code snippet , which read all csv files under the given folder row by row but my requirement is to read csv file at a time and convert as json which will looks like : {"A":"1","B":"3","C":"4","D":9} Csv file data format : ------------------------------- *field_id,data,* *A,1B,3C,4D,9* Code snippet: -------------------------- *final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String path = "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new RowCsvInputFormat( new Path(path), fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, -1);lines.map(value -> value).print();* Any help is highly appreciated. Thanks, -Deep |
Hi Deep,
(redirecting this to user mailing list as this is not a dev question) You can try to set the line delimiter and field delimiter of the RowCsvInputFormat to a non-printing character (assume there is no non-printing characters in the csv files). It will read all the content of a csv file into one Row. e.g. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String path = "test"; TypeInformation[] fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO}; RowCsvInputFormat csvFormat = new RowCsvInputFormat(new Path(path), fieldTypes); csvFormat.setNestedFileEnumeration(true); csvFormat.setDelimiter((char) 0); csvFormat.setFieldDelimiter(String.valueOf((char) 0)); DataStream<Row> lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, -1);lines.map(value -> value).print(); env.execute(); Then you can convert the content of the csv files to json manually. Best, Wei > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道: > > Hi Guys, > > Below is my code snippet , which read all csv files under the given folder > row by row but my requirement is to read csv file at a time and convert as > json which will looks like : > {"A":"1","B":"3","C":"4","D":9} > > Csv file data format : > ------------------------------- > *field_id,data,* > > > > *A,1B,3C,4D,9* > > Code snippet: > -------------------------- > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();String path = > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > RowCsvInputFormat( new Path(path), > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > -1);lines.map(value -> value).print();* > > > Any help is highly appreciated. > > Thanks, > -Deep |
Hi Deep,
Could you use the TextInputFormat which reads a file line by line? That way you can do the JSON parsing as part of a mapper which consumes the file lines. Cheers, Till On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote: > Hi Deep, > > (redirecting this to user mailing list as this is not a dev question) > > You can try to set the line delimiter and field delimiter of the > RowCsvInputFormat to a non-printing character (assume there is no non-printing > characters in the csv files). It will read all the content of a csv file > into one Row. e.g. > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > String path = "test"; > TypeInformation[] fieldTypes = new TypeInformation[]{ > BasicTypeInfo.STRING_TYPE_INFO}; > RowCsvInputFormat csvFormat = > new RowCsvInputFormat(new Path(path), fieldTypes); > csvFormat.setNestedFileEnumeration(true); > csvFormat.setDelimiter((char) 0); > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); > DataStream<Row> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > -1);lines.map(value -> value).print(); > env.execute(); > > > Then you can convert the content of the csv files to json manually. > > Best, > Wei > > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道: > > Hi Guys, > > Below is my code snippet , which read all csv files under the given folder > row by row but my requirement is to read csv file at a time and convert as > json which will looks like : > {"A":"1","B":"3","C":"4","D":9} > > Csv file data format : > ------------------------------- > *field_id,data,* > > > > *A,1B,3C,4D,9* > > Code snippet: > -------------------------- > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();String path = > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > RowCsvInputFormat( new Path(path), > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > -1);lines.map(value -> value).print();* > > > Any help is highly appreciated. > > Thanks, > -Deep > > > |
Hi Wei and Till,
Thanks for the quick reply. *@Wei,* I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format : Csv file data format : ------------------------------- *field_id,data,* *A,1B,3C,4D,9* *E,0,0,0,0* because of last row which contains more that two value, and its is throwing *org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,* How to handle the above corner case.Could you please suggest some way to handle this. *@Till,* Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat and transform will not work by using map operator. Correct me if i'm wrong . Thanks & Regards, -Deep On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann <[hidden email]> wrote: > Hi Deep, > > Could you use the TextInputFormat which reads a file line by line? That way > you can do the JSON parsing as part of a mapper which consumes the file > lines. > > Cheers, > Till > > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong <[hidden email]> wrote: > > > Hi Deep, > > > > (redirecting this to user mailing list as this is not a dev question) > > > > You can try to set the line delimiter and field delimiter of the > > RowCsvInputFormat to a non-printing character (assume there is no > non-printing > > characters in the csv files). It will read all the content of a csv file > > into one Row. e.g. > > > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > String path = "test"; > > TypeInformation[] fieldTypes = new TypeInformation[]{ > > BasicTypeInfo.STRING_TYPE_INFO}; > > RowCsvInputFormat csvFormat = > > new RowCsvInputFormat(new Path(path), fieldTypes); > > csvFormat.setNestedFileEnumeration(true); > > csvFormat.setDelimiter((char) 0); > > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); > > DataStream<Row> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > > -1);lines.map(value -> value).print(); > > env.execute(); > > > > > > Then you can convert the content of the csv files to json manually. > > > > Best, > > Wei > > > > > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh <[hidden email]> 写道: > > > > Hi Guys, > > > > Below is my code snippet , which read all csv files under the given > folder > > row by row but my requirement is to read csv file at a time and convert > as > > json which will looks like : > > {"A":"1","B":"3","C":"4","D":9} > > > > Csv file data format : > > ------------------------------- > > *field_id,data,* > > > > > > > > *A,1B,3C,4D,9* > > > > Code snippet: > > -------------------------- > > > > > > > > > > > > > > > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment();String path = > > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > > RowCsvInputFormat( new Path(path), > > > > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream<Row> > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > > -1);lines.map(value -> value).print();* > > > > > > Any help is highly appreciated. > > > > Thanks, > > -Deep > > > > > > > |
Free forum by Nabble | Edit this page |