集算器实现外部数据并行计算
【摘要】
SPL 可将文本文件按体积大致分为 N 段,而不是按行号精确分段,如此可大幅提高并行计算性能。通过 JDBC 取数时,有时会遇到数据库负载虽然不重,但取数性能仍然较差的情况,这种情况下可以用并行取数提高性能。当数据量太大时,除了分库计算,还可以进行混合数据源并行计算,后者性能更高。点击集算器实现外部数据并行计算了解详情。
文本并行
SPL可将文本文件按体积大致分为N段,只读取其中一段。比如cardInfo.txt存储着一千万条人口信息,将其分为十份,取第二份,代码可以写作:
A |
B |
|
1 |
=file("d:\\temp\\cardInfo千万.txt") |
|
2 |
=A1.import@t(;2:10) |
/直接读入内存 |
3 |
=A1.cursor@t(;2:10).fetch@x() |
/游标方式读取 |
按体积大致分段,而不是按行数精确分段,目的是提高分段性能。比如在IDE中观察A2或A3的前几个字段,可以看到行数并非精确的100万(与具体数据有关):
index |
cardNo |
name |
gender |
province |
mobile |
1 |
308200310180525 |
Alison Clinton |
female |
Idaho |
1024627490 |
2 |
709198311300191 |
Abby Wood |
female |
Kansas |
19668466 |
3 |
1005199807060610 |
George Bush |
male |
California |
1019879226 |
… |
… |
… |
… |
… |
… |
1000005 |
405199907050256 |
Mark Rowswell |
male |
Idaho |
1168620176 |
分段读取可应用于多线程计算,从而提高读取性能。比如用2个线程分别读取cardInfo.txt,各线程计算本段行数,最后合并为总行数,可用如下代码:
5 |
fork to(2) |
=A1.cursor@t(;A5:2).total(count(1)) |
/2线程分段 |
6 |
=A5.sum() |
/合并结果 |
语句fork语句适合算法较复杂的情况,当算法比较简单时,可用cursor@m直接分段读取。比如前面的代码可以改写如下:
7 |
=A1.cursor@tm(;2).total(count(1)) |
/2线程分段 |
上述代码指定了线程数,如果省略线程数,则用配置文件中的“parallet limit”当做默认线程数。假设parallet limit=2,则上述代码可以改写成:
8 |
=A1.cursor@tm().total(count(1)) |
/默认线程数分段 |
为了验证分段读取前后的性能差异,下面设计一个算法,分别用单线程和2线程计算cardInfo.txt的总行数,可以看到性能显著提升:
11 |
=now() |
|
12 |
=A1.cursor@t().total(count(1)) |
|
13 |
=interval@ms(A11,now()) |
/未分段,20882ms |
14 |
||
15 |
=now() |
|
16 |
=A1.cursor@tm(;2).total(count(1)) |
|
17 |
=interval@ms(A15,now()) |
/2线程分段,12217ms |
JDBC 并行
通过JDBC取数时,有时会遇到数据库负载虽然不重,但取数性能仍然较差的情况,这种情况下可以用并行取数提高性能。
比如Oracle数据库有一张通话记录表callrecord,记录数100万条,索引字段是callTime,且数据基本按该字段平均分布。采用非并行取数时,可以发现性能不够理想,代码如下:
A |
B |
|
1 |
=now() |
/记录时间,用于测试性能 |
2 |
=connect("orcl") |
|
3 |
=A2.query@x("select * from callrecord") |
|
4 |
=interval@ms(A1,now()) |
/非并行取数,17654ms |
改为2线程并行取数后,可以看到性能提升明显,代码如下:
6 |
=now() |
|
7 |
=connect("orcl").query@x("select min(callTime),max(callTime) from callrecordA") |
|
8 |
=2.(range(A7.#1,elapse@s(A7.#2,1),~:2)) |
/时间区间参数列表 |
9 |
fork A8 |
=connect("orcl") |
10 |
=B9.query@x("select * from callrecordA where callTime>=? and callTime<?",A9(1),A9(2)) |
|
11 |
=A9.conj() |
|
12 |
=interval@ms(A6,now()) |
/并行取数,10045ms |
既然要并行取数,就要把源数据分成多个区间,使每区间的数据量大致相等。在这个例子中,索引字段是时间类型callTime,所以先用A7求出callTime的数据范围,再用A8将该范围平均分成2个时间区间。之后在A9进行并行计算,每个线程以各自的时间区间为参数执行SQL,取数结果将大致相等。最后合并多线程的取数结果,作为最终结果。
函数range非常适合对数据分段。该函数可将某范围平均分为N个区间,获得第i个区间,且可根据范围的数据类型自动调整区间的数据类型。本例的范围类型是datetime,则函数range将范围按秒均分,返回类型也是datetime。如果范围类型是date,则函数range按天均分;如果范围类型是整数,则函数range按整数均分。
上面例子中,分段字段是索引,如果没有建立索引,则查询性能会出现下降。在这种情况下,并行取数仍然可以带来明显的性能提升,所以可以用相同的方法。
上面例子中,源数据基本按callTime平均分布,因此容易使各区间的数据量大致相等,如果源数据分布很不平均,可以考虑按行号分段。每种数据库都有生成行号的方法,比如oralce可用rownum。
除了单表单SQL并行取数,SPL也支持多表多SQL并行取数。比如某报表格式较复杂,需要SPL执行多个SQL,并按一定的格式拼出结果集。当采用非并行取数时,可以发现性能不够理想,代码如下:
A |
B |
|
1 |
=now() |
=connect("orcl") |
2 |
select count(1) from callrecordA where to_char(calltime,'yyyy')='2015' |
=B1.query(A2) |
3 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201501' |
=B1.query(A3) |
4 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201502' |
=B1.query(A4) |
5 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201503' |
=B1.query(A5) |
6 |
select count(1) from callrecordA where to_char(calltime,'yyyy')='2016' |
=B1.query(A6) |
7 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201601' |
=B1.query(A7) |
8 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201602' |
=B1.query(A8) |
9 |
select count(1) from callrecordA where to_char(calltime,'yyyyMM')='201603' |
=B1.query(A9) |
10 |
=B1.close() |
|
11 |
=[B2:B9].new(~.#1:data) |
|
12 |
=interval@ms(A1,now()) |
/非并行取数,2195毫秒 |
改为4线程并行取数后,可以看到性能提升明显,代码如下:
14 |
=now() |
|
15 |
fork [A2:A9] |
=connect("orcl") |
16 |
=B15.query@x(A15) |
|
17 |
=A15.new(~.#1:data) |
|
18 |
=interval@ms(A14,now()) |
/4并行取数,1320毫秒 |
需要注意的是,并行取数时任务数可大于并行数。比如上面代码共8个任务,但同时执行的任务只有4个,其他待执行的任务排在队列中,如果某个小任务先执行完成,SPL会从队列中取下一个任务并执行它。可以看到,当任务数较多时,即使各任务负载相差较大,也能充分发挥硬件性能。
混合并行
当数据量太大时,除了分库计算,还可以进行混合数据源并行计算,后者性能更高。具体做法是:把数据分为两部分(或多部分),一部分存储在数据库中,通常是当前实时数据,一部分存储在组文件,通常是历史数据,再对两种数据源进行并行计算,从而获得更高性能。
比如历史订单存储在orders.ctx中,当前订单存储在数据库orcl中,请按年、月分组,对各组数据的amount字段求和。SPL代码如下:
A |
B |
|
1 |
fork |
select extract(year from orderTime)y,extract(month from orderTime)m,sum(amount) amount from orders group by extract(year from orderTime),extract(month from orderTime) |
2 |
=connect("orcl") |
|
3 |
=B2.query@x(B1) |
|
4 |
fork |
=file("orders.ctx").create() |
5 |
=B4.groups(year(ORDERTIME):Y,month(ORDERTIME):M;sum(AMOUNT):AMOUNT) |
|
6 |
=[A1,A4].conj() |
|
7 |
=A6.groups(Y,M;sum(AMOUNT):AMOUNT) |
注意fork……fork……的用法。如果fork语句块下接非fork语句块,则两者顺序执行,如果fork语句块下接fork语句块,则两者并行执行。
英文版