合并多文件后分组再结构化

【问题】

Here's the problem statement:

In a folder in HDFS, there're a few csv files with each row being a record with the schema (ID, attribute1, attribute2, attribute3).

Some of the columns (except ID) could be null or empty strings, and no 2 records with the same ID can have the same non-empty value.

We'd like to merge all records with the same ID, and write all merged records also in HDFS. For example:

Record R1: ID = 1, attribute1 = "hello", attribute2 = null, attribute3 = "";

Record R2: ID = 1, attribute1 = null, attribute2 = null, attribute3 = "testa";

Record R3: ID = 1 attribute1 = null, attribute2 = "okk", attribute3 = "testa";

 

Merged record should be: ID = 1, attribute1 = "hello", attribute2 = "okk", attribute3 = "testa"

I'm just starting to learn Spark. Could anybody share some thoughts on how to write this in Java with Spark? Thanks!

Here're some sample csv files:

file1.csv:

ID,str1,str2,str3

1,hello,,

file2.csv:

ID,str1,str2,str3

1,,,testa

file3.csv:

ID,str1,str2,str3

1,,okk,testa

The merged file should be:

ID,str1,str2,str3

1,hello,okk,testa

It's known beforehand that there won't be any conflicts on any fields.

Thanks!

【回答】

复述问题:有N个文件相当于N条记录,逻辑上按ID分为M个组,将每组整理为一条记录,第2-4字段的值为将本组记录中该字段的第一个非空取值,如果都空,则本字段也为空。

JAVA(Spark)写些代码较为复杂,可考虑用SPL实现,代码简单易懂,也能直接访问HDFS

 


A

1

=["file1.csv","file2.csv","file3.csv"].("hdfs://192.168.1.210:9000/user/hfiles/"+~)

2

=hdfs_client(;"hdfs:// 192.168.1.210:9000")

3

=A1.conj(hdfs_file(A2,~).import@ct())

4

=A3.group(#1)

5

=A4.new(#1,~.(#2).select@1(~),~.(#3).select@1(~),~.(#4).select@1(~))

6

=hdfs_file(A2,"/user/hfiles/result.csv").export@tc(A5)

 

A1:拼成字符串序列

undefined

A2连接hdfs文件系统

A3读取每个文件中的内容,并将数据合并到一起。

 undefined

A4:按照第一列分组

A5:将每组整理为一条记录,第2-4字段的值为将本组记录中该字段的第一个非空取值。

A5还可简化为:=A4.new(#1,${to(2,4).("~.(#"/~/").select@1(~)").concat@c()})

A6:写入结果文件

上述代码很容易和JAVA集成(可参考Java 如何调用 SPL 脚本)。