Python 和 SPL 对比 12——大数据处理

在数据分析时,经常遇到内存无法放下的数据,需要借助硬盘完成数据分析,本文对比 Python 和 SPL 对这种数量级数据的运算分析能力,至于更大级别如 PB 级别的数据则需要分布式系统来完成分析了,不在本文讨论范围内。

聚合

简单聚合只要遍历一遍数据,按照聚合目标将聚合列计算一遍即可。如:求和(sum),遍历数据时对读取的数据进行累加;计数(count),遍历数据时,记录遍历数;平均(mean),遍历时同时记录累计和和遍历数,最后相除即可。这里以求和问题为例进行介绍。

现有如下文件,请计算订单总额。

数据片段如下:

orderkey orderdate      state       quantity amount

1     2008-01-01  Wyoming      10   282.7

2     2008-01-01  Indiana   3     84.81

3     2008-01-01  Nebraska       82   2318.14

4     2008-01-01  West Virginia 9     254.43

Python

order_file="D:\data\orders.txt"

total=0

with   open(order_file,'r') as f:

    line=f.readline()

    while True:

        line = f.readline()

        if not line:

            break

        total +=   float(line.split("\t")[4])

print(total)

 

 

打开文件

标题行

 

逐行读入

读不到内容时结束

 

累加

 

Python可以逐行读取,找到每行的订单金额字段,然后不断累加汇总完成最终的求和运算,这也谈不上什么技巧可言,就是硬编码。Python 还可以借助 Pandas 分块读取,完成求和运算,代码如下:

import pandas as pd

order_file="D:\data\orders.txt"

chunk_data =   pd.read_csv(order_file,sep="\t",chunksize=100000)

total=0

for chunk in chunk_data:

      total+=chunk['amount'].sum()

print(total)

 

 

分块读取文件,每块 10 万行

 

 

累加各块的销售额

 

Pandas 允许分块读取数据,每块汇总很简单,然后循环所有块,对所有块汇总得到最终的求和结果。这种方式在编码上略显简单,运算效率上也略有提升,但也算是硬编码,写起来还是很麻烦。

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()

/创建游标

3

=A2.total(sum(amount))


SPL利用游标计算完成大数据计算,提供了丰富的游标计算函数,total 函数就是其中一个,为了重复利用游标,total 还允许同时进行多个聚合运算,如求和,最大值同时计算,代码这样写:

A2.total(sum(amount),max(quantity))

简单的一句代码就可以完成订单总额的汇总和最大数量的计算,这相较于 Python 简直太简单了,而且运算效率还高。

过滤

过滤和聚合差不多,将大文件分成 n 段,对各段进行过滤,最后将每一段的结果进行合并即可。

继续以上面数据为例,过滤出纽约州的销售信息

小结果集

Python

import pandas as   pd

order_file="D:\data\orders.txt"

chunk_data =   pd.read_csv(order_file,sep="\t",chunksize=100000)

chunk_list = []

for chunk in   chunk_data:

      chunk_list.append(chunk[chunk.state=="New York"])

res =   pd.concat(chunk_list)

print(res)

 

 

 

定义空列表存放结果

分段过滤

 

合并结果

Python依托 pandas 分段过滤,找出满足条件的结果,写起来还是比较麻烦,逐行读取的方式也可以完成过滤,写起来没有什么技巧可言,按逻辑写代码就可以了,这里就不再写出来了。

大结果集

大结果集是指过滤以后的结果内存也放不下。

import pandas as pd

infile="D:\data\orders.txt"

ofile="D:\data\orders_filter.txt"

chunk_data =   pd.read_csv(infile,sep="\t",chunksize=100000)

n=0

for chunk in chunk_data:

    need_data =   chunk[chunk.state=='New York']

    if n == 0:

          need_data.to_csv(ofile,index=None)

        n+=1

    else:

          need_data.to_csv(ofile,index=None,mode='a',header=None)

 

 

 

分块读取

 

 

过滤

 

写出文件

 

追加文件

 

大结果集内存放不下,只好将过滤的结果放到硬盘,供后续计算用,Pandas 分块处理后,分块过滤然后存储到硬盘。

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()


3

=A2.select(state=="New   York")


4

=A3.fetch()

/小结果集取出

5

=file(out_file).export@tc(A3)

/大结果集存到硬盘

SPL 过滤游标时使用 select 函数,它和内存计算时有区别,不会立即执行选出的动作,只是记录下来要执行选出动作,在 fetch 或写出时才会真的执行选出动作。

注意:A4 是将小结果集保留在内存,A5 是将大结果集存到硬盘,两者不能同时执行。

排序

大数据排序也是比较常见的操作,但这个开销会非常大。如:

按订单金额对订单排序。

Python

import pandas as   pd

import os

import time

import shutil

import uuid

import traceback

def parse_type(s):

    if s.isdigit():

        return int(s)

    try:

        res = float(s)

        return res

    except:

        return s

 

def pos_by(by,   head, sep):

    by_num = 0

    for col in head.split(sep):

        if col.strip() == by:

            break

        else:

            by_num += 1

    return by_num

 

def   merge_sort(directory, ofile, by, ascending=True, sep=","):

    with open(ofile, 'w') as outfile:

        file_list = os.listdir(directory)

        file_chunk = [open(directory +   "/" + file, 'r') for file in file_list]

        k_row = [file_chunk[i].readline()for   i in range(len(file_chunk))]

        by = pos_by(by, k_row[0], sep)

        outfile.write(k_row[0])

    k_row = [file_chunk[i].readline()for i   in range(len(file_chunk))]

    k_by = [parse_type(k_row[i].split(sep)[by].strip())  for i in range(len(file_chunk))]

    with open(ofile, 'a') as outfile:

        while True:

            for i in range(len(k_by)):

                if i >= len(k_by):

                    break

                sorted_k_by = sorted(k_by) if   ascending else sorted(k_by, reverse=True)

                if k_by[i] == sorted_k_by[0]:

                    outfile.write(k_row[i])

                    k_row[i] =   file_chunk[i].readline()

                    if not k_row[i]:

                        file_chunk[i].close()

                        del (file_chunk[i])

                        del (k_row[i])

                        del (k_by[i])

                    else:

                        k_by[i] =   parse_type(k_row[i].split(sep)[by].strip())

            if len(k_by) == 0:

                break

 

def   external_sort(file_path, by, ofile, tmp_dir, ascending=True, chunksize=50000,   sep=',', usecols=None,index_col=None):

    os.makedirs(tmp_dir, exist_ok=True)

    try:

        data_chunk = pd.read_csv(file_path,   sep=sep, usecols=usecols, index_col=index_col, chunksize=chunksize)

        for chunk in data_chunk:

            chunk = chunk.sort_values(by,   ascending=ascending)

            chunk.to_csv(tmp_dir +   "/" + "chunk" + str(int(time.time() * 10 ** 7))+   str(uuid.uuid4()) + ".csv", index=None, sep=sep)

        merge_sort(tmp_dir, ofile=ofile,   by=by, ascending=ascending, sep=sep)

    except Exception:

        print(traceback.format_exc())

    finally:

        shutil.rmtree(tmp_dir,   ignore_errors=True)

if __name__ ==   "__main__":

    infile = "D:\data\orders.txt"

    ofile =   "D:\data\orders_sort.txt"

    tmp = "D:/data/tmp"

    external_sort(infile, 'amount', ofile,   tmp, ascending=True, chunksize=1000000, sep='\t')

 

 

 

 

 

 

解析字符串数据类型

 

 

 

 

 

 

 

 

计算要排序的列名在表头中的位置

 

 

 

 

 

 

 

 

外存归并排序

 

列出临时文件

打开临时文件

读取表头

要排序的列在表头的位置

写出表头

读取正文

维护一个 k 个元素的列表,存放 k 个排序列值

 

 

 

 

 

排序

 

 

 

 

读完一个文件处理一个

 

 

 

 

 

 

所有文件读完结束

 

 

外存排序

 

 

 

分块读取

 

 

内存排序

写出到硬盘

 

归并排序

 

 

 

 

删除临时目录

主程序

 

 

 

外存排序

Python 没有现成的外存排序函数,只能自己写,外存归并排序相比于聚合和过滤,代码相当复杂了,对于很多非专业程序员来讲已经是不太可能实现的任务了,而且它的运算效率也不高。

SPL


A

B

1

D:\data\orders.txt


2

D:\data\orders_sort.txt


3

=file(A1).cursor@t()

/创建游标

4

=A3.sortx(amount)

/排序

5

=file(A2).export@t(A4)

/写出

SPL 提供了游标排序函数 sortx(),再写法上和内存的 sort() 类似,只是它会返回游标,取其中的数据时需要 fetch 或者写出到硬盘。这相较于 Python 的硬编码,无论是写还是算都有数量级的提升。

分组和关联

大数据分组和大数据关联对于 Python 来说太难了,涉及 Hash 的过程,对于普通程序员来说也不太可能用 Python 来实现,所以这里就不再列出 Python 的代码了

来看看 SPL 怎么分组和关联吧。

分组

汇总各州订单的销售额。

SPL


A

B

1

D:\data\orders.txt


2

=file(A1).cursor@t()


3

=A2.groups(state;sum(amount):amount)


SPL 的 groups 函数支持游标分组聚合,用法和内存时相同,而且内部实现了 Hash 分组的高效代码,所以写起来简单而且运算效率高。

关联

假设订单表内有客户的 ID 字段,计算时需要关联客户信息。

SPL


A

B

1

D:\data\orders.txt


2

D:\data\client.txt


3

=file(A1).cursor@t()


4

=file(A2).import@t()


5

=A3.switch(client,A4:clientid)

/关联

6

=A5.groups(...)

/分组聚合

SPL 提供了丰富的游标函数,switch 函数和内存时的用法也类似,只不过返回延迟游标,在后续执行代码时才会真的运行 switch 动作,整段代码和内存函数差不多,很好写,也好理解,运行速度还快。

小结

Python 处理大文件是真费劲,主要是因为它没有提供为大数据服务的游标类型及相关运算,只能自己写代码,不仅繁琐而且运算效率低。、

SPL 拥有完善的游标系统,多数游标函数的用法和内存函数没有什么区别,对用户很友好,而且内部都是用的高效算法,运行效率很高。