Python 和 SPL 对比 12——大数据处理
在数据分析时,经常遇到内存无法放下的数据,需要借助硬盘完成数据分析,本文对比 Python 和 SPL 对这种数量级数据的运算分析能力,至于更大级别如 PB 级别的数据则需要分布式系统来完成分析了,不在本文讨论范围内。
聚合
简单聚合只要遍历一遍数据,按照聚合目标将聚合列计算一遍即可。如:求和(sum),遍历数据时对读取的数据进行累加;计数(count),遍历数据时,记录遍历数;平均(mean),遍历时同时记录累计和和遍历数,最后相除即可。这里以求和问题为例进行介绍。
现有如下文件,请计算订单总额。
数据片段如下:
orderkey orderdate state quantity amount
1 2008-01-01 Wyoming 10 282.7
2 2008-01-01 Indiana 3 84.81
3 2008-01-01 Nebraska 82 2318.14
4 2008-01-01 West Virginia 9 254.43
Python
order_file="D:\data\orders.txt" total=0 with open(order_file,'r') as f: line=f.readline() while True: line = f.readline() if not line: break total += float(line.split("\t")[4]) print(total) |
打开文件 标题行
逐行读入 读不到内容时结束
累加
|
Python可以逐行读取,找到每行的订单金额字段,然后不断累加汇总完成最终的求和运算,这也谈不上什么技巧可言,就是硬编码。Python 还可以借助 Pandas 分块读取,完成求和运算,代码如下:
import pandas as pd order_file="D:\data\orders.txt" chunk_data = pd.read_csv(order_file,sep="\t",chunksize=100000) total=0 for chunk in chunk_data: total+=chunk['amount'].sum() print(total) |
分块读取文件,每块 10 万行
累加各块的销售额
|
Pandas 允许分块读取数据,每块汇总很简单,然后循环所有块,对所有块汇总得到最终的求和结果。这种方式在编码上略显简单,运算效率上也略有提升,但也算是硬编码,写起来还是很麻烦。
SPL
A |
B |
|
1 |
D:\data\orders.txt |
|
2 |
=file(A1).cursor@t() |
/创建游标 |
3 |
=A2.total(sum(amount)) |
SPL利用游标计算完成大数据计算,提供了丰富的游标计算函数,total 函数就是其中一个,为了重复利用游标,total 还允许同时进行多个聚合运算,如求和,最大值同时计算,代码这样写:
A2.total(sum(amount),max(quantity))
简单的一句代码就可以完成订单总额的汇总和最大数量的计算,这相较于 Python 简直太简单了,而且运算效率还高。
过滤
过滤和聚合差不多,将大文件分成 n 段,对各段进行过滤,最后将每一段的结果进行合并即可。
继续以上面数据为例,过滤出纽约州的销售信息
小结果集
Python
import pandas as pd order_file="D:\data\orders.txt" chunk_data = pd.read_csv(order_file,sep="\t",chunksize=100000) chunk_list = [] for chunk in chunk_data: chunk_list.append(chunk[chunk.state=="New York"]) res = pd.concat(chunk_list) print(res) |
定义空列表存放结果 分段过滤
合并结果 |
Python依托 pandas 分段过滤,找出满足条件的结果,写起来还是比较麻烦,逐行读取的方式也可以完成过滤,写起来没有什么技巧可言,按逻辑写代码就可以了,这里就不再写出来了。
大结果集
大结果集是指过滤以后的结果内存也放不下。
import pandas as pd infile="D:\data\orders.txt" ofile="D:\data\orders_filter.txt" chunk_data = pd.read_csv(infile,sep="\t",chunksize=100000) n=0 for chunk in chunk_data: need_data = chunk[chunk.state=='New York'] if n == 0: need_data.to_csv(ofile,index=None) n+=1 else: need_data.to_csv(ofile,index=None,mode='a',header=None) |
分块读取
过滤
写出文件
追加文件
|
大结果集内存放不下,只好将过滤的结果放到硬盘,供后续计算用,Pandas 分块处理后,分块过滤然后存储到硬盘。
SPL
A |
B |
|
1 |
D:\data\orders.txt |
|
2 |
=file(A1).cursor@t() |
|
3 |
=A2.select(state=="New York") |
|
4 |
=A3.fetch() |
/小结果集取出 |
5 |
=file(out_file).export@tc(A3) |
/大结果集存到硬盘 |
SPL 过滤游标时使用 select 函数,它和内存计算时有区别,不会立即执行选出的动作,只是记录下来要执行选出动作,在 fetch 或写出时才会真的执行选出动作。
注意:A4 是将小结果集保留在内存,A5 是将大结果集存到硬盘,两者不能同时执行。
排序
大数据排序也是比较常见的操作,但这个开销会非常大。如:
按订单金额对订单排序。
Python
import pandas as pd import os import time import shutil import uuid import traceback def parse_type(s): if s.isdigit(): return int(s) try: res = float(s) return res except: return s
def pos_by(by, head, sep): by_num = 0 for col in head.split(sep): if col.strip() == by: break else: by_num += 1 return by_num
def merge_sort(directory, ofile, by, ascending=True, sep=","): with open(ofile, 'w') as outfile: file_list = os.listdir(directory) file_chunk = [open(directory + "/" + file, 'r') for file in file_list] k_row = [file_chunk[i].readline()for i in range(len(file_chunk))] by = pos_by(by, k_row[0], sep) outfile.write(k_row[0]) k_row = [file_chunk[i].readline()for i in range(len(file_chunk))] k_by = [parse_type(k_row[i].split(sep)[by].strip()) for i in range(len(file_chunk))] with open(ofile, 'a') as outfile: while True: for i in range(len(k_by)): if i >= len(k_by): break sorted_k_by = sorted(k_by) if ascending else sorted(k_by, reverse=True) if k_by[i] == sorted_k_by[0]: outfile.write(k_row[i]) k_row[i] = file_chunk[i].readline() if not k_row[i]: file_chunk[i].close() del (file_chunk[i]) del (k_row[i]) del (k_by[i]) else: k_by[i] = parse_type(k_row[i].split(sep)[by].strip()) if len(k_by) == 0: break
def external_sort(file_path, by, ofile, tmp_dir, ascending=True, chunksize=50000, sep=',', usecols=None,index_col=None): os.makedirs(tmp_dir, exist_ok=True) try: data_chunk = pd.read_csv(file_path, sep=sep, usecols=usecols, index_col=index_col, chunksize=chunksize) for chunk in data_chunk: chunk = chunk.sort_values(by, ascending=ascending) chunk.to_csv(tmp_dir + "/" + "chunk" + str(int(time.time() * 10 ** 7))+ str(uuid.uuid4()) + ".csv", index=None, sep=sep) merge_sort(tmp_dir, ofile=ofile, by=by, ascending=ascending, sep=sep) except Exception: print(traceback.format_exc()) finally: shutil.rmtree(tmp_dir, ignore_errors=True) if __name__ == "__main__": infile = "D:\data\orders.txt" ofile = "D:\data\orders_sort.txt" tmp = "D:/data/tmp" external_sort(infile, 'amount', ofile, tmp, ascending=True, chunksize=1000000, sep='\t') |
解析字符串数据类型
计算要排序的列名在表头中的位置
外存归并排序
列出临时文件 打开临时文件 读取表头 要排序的列在表头的位置 写出表头 读取正文 维护一个 k 个元素的列表,存放 k 个排序列值
排序
读完一个文件处理一个
所有文件读完结束
外存排序
分块读取
内存排序 写出到硬盘
归并排序
删除临时目录 主程序
外存排序 |
Python 没有现成的外存排序函数,只能自己写,外存归并排序相比于聚合和过滤,代码相当复杂了,对于很多非专业程序员来讲已经是不太可能实现的任务了,而且它的运算效率也不高。
SPL
A |
B |
|
1 |
D:\data\orders.txt |
|
2 |
D:\data\orders_sort.txt |
|
3 |
=file(A1).cursor@t() |
/创建游标 |
4 |
=A3.sortx(amount) |
/排序 |
5 |
=file(A2).export@t(A4) |
/写出 |
SPL 提供了游标排序函数 sortx(),再写法上和内存的 sort() 类似,只是它会返回游标,取其中的数据时需要 fetch 或者写出到硬盘。这相较于 Python 的硬编码,无论是写还是算都有数量级的提升。
分组和关联
大数据分组和大数据关联对于 Python 来说太难了,涉及 Hash 的过程,对于普通程序员来说也不太可能用 Python 来实现,所以这里就不再列出 Python 的代码了。
来看看 SPL 怎么分组和关联吧。
分组
汇总各州订单的销售额。
SPL
A |
B |
|
1 |
D:\data\orders.txt |
|
2 |
=file(A1).cursor@t() |
|
3 |
=A2.groups(state;sum(amount):amount) |
SPL 的 groups 函数支持游标分组聚合,用法和内存时相同,而且内部实现了 Hash 分组的高效代码,所以写起来简单而且运算效率高。
关联
假设订单表内有客户的 ID 字段,计算时需要关联客户信息。
SPL
A |
B |
|
1 |
D:\data\orders.txt |
|
2 |
D:\data\client.txt |
|
3 |
=file(A1).cursor@t() |
|
4 |
=file(A2).import@t() |
|
5 |
=A3.switch(client,A4:clientid) |
/关联 |
6 |
=A5.groups(...) |
/分组聚合 |
SPL 提供了丰富的游标函数,switch 函数和内存时的用法也类似,只不过返回延迟游标,在后续执行代码时才会真的运行 switch 动作,整段代码和内存函数差不多,很好写,也好理解,运行速度还快。
小结
Python 处理大文件是真费劲,主要是因为它没有提供为大数据服务的游标类型及相关运算,只能自己写代码,不仅繁琐而且运算效率低。、
SPL 拥有完善的游标系统,多数游标函数的用法和内存函数没有什么区别,对用户很友好,而且内部都是用的高效算法,运行效率很高。
英文版