SPL 实践:向数据库外迁移计算任务
一、数据搬出
当TP数据库太撑时,采用SPL来承担AP任务。首先需要将TP数据库的数据搬出来。
关系型数据库通常使用jdbc连接数据库取数。以oracle为例,数据结构参考TPC-H。
将ORDERS订单表转储为SPL的高性能组表文件,脚本如下:
A |
|
1 |
=connect@l("oracle12c") |
2 |
=A1.cursor@x("O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS ORDER BY 1") |
3 |
=file("orders.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment) |
4 |
>A3.append@i(A2) |
jdbc取数很慢,可以利用多核并行取数的办法提速。LINEITEM订单明细表数据量较大,按以上脚本的做法,耗时较长。
可以按L_ORDERKEY除以n的余数[0,..,n-1],将数据分成n份,每一份对应一个线程取数并转储为一个集文件,最后再把这n个集文件数据归并成组表文件,脚本如下:
A |
B |
|
1 |
fork 4.() |
=connect@l("oracle12c") |
2 |
=B1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE MOD(L_ORDERKEY,4)="/(A1-1)/"ORDER BY 1,2") |
|
3 |
=file("lineitem"/A1/".btx").export@b(B2) |
|
4 |
=directory("lineitem?.btx") |
|
5 |
=A4.(file(~).cursor@b()).merge(#1,#2) |
|
6 |
=file("lineitem.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment) |
|
7 |
>A6.append@i(A5) |
|
8 |
=A4.(movefile(~)) |
A1中fork n.()的n为4,是并行数
当数据库中数据量很大,数据库接近饱和,再在SQL中执行ORDER BY,将库内的大量数据排序后再取出,可能会出现数据库排序时卡住的现象。为避免这种情况,可以使用SPL的游标排序(cs.sortx)。执行时,可以观察sortx是否持续产生临时文件,依此判断程序是否正常执行。
对于增量数据,例如每天固定时间,根据时间戳字段获取增量数据。
假设ORDERS表的O_ORDERDATE为时间戳字段且O_ORDERKEY是递增的,而LINEITEM表通过L_ORDERKEY外键连接ORDERS表的O_ORDERKEY主键,获取增量数据后与冷数据归并的脚本如下:
A |
|
1 |
=connect@l("oracle12c") |
2 |
=st |
3 |
=now() |
4 |
=maxkey |
5 |
=A1.cursor("SELECT O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS WHERE O_ORDERDATE>? AND O_ORDERDATE<=? ORDER BY 1",A2,A3) |
6 |
=A1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4) |
7 |
=file("orders.ctx").open().cursor() |
8 |
=[A5,A7].merge(#1) |
9 |
=file("orders_new.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment) |
10 |
>A9.append@i(A8) |
11 |
>movefile("orders_new.ctx","orders.ctx") |
12 |
=file("lineitem.ctx").open().cursor() |
13 |
=[A6,A12].merge(#1,#2) |
14 |
=file("lineitem_new.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment) |
15 |
>A14.append@i(A13) |
16 |
>movefile("lineitem_new.ctx","lineitem.ctx") |
A2的st是ORDERS组表中的最大O_ORDERDATE
A3的now()函数为系统当前时间
A4的maxkey是ORDERS组表中的最大O_ORDERKEY
二、常规冷数据运算
转储完成后,就可以基于SPL的组表文件进行高性能计算,包括但不限于:条件过滤、分组聚合、表间关联等。常规计算可参考:SPL文件上有的常规运算,这里仅举两例。以TPCH的Q1和Q3为例:
1、小结果集的常规分组汇总
SQL举例:
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
lineitem
WHERE
l_shipdate <= date '1995-12-01' - INTERVAL '90' DAY(3)
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus;
对应的SPL脚本:
A |
|
1 |
=now() |
2 |
1995-12-01 |
3 |
=A2-90 |
4 |
=file("lineitem.ctx").open().cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3) |
5 |
=A4.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order) |
6 |
=A5.run(sum_charge+=sum_disc_price) |
7 |
=interval@ms(A1,now()) |
2、主子表关联后进行分组统计
SQL举例:
SELECT
*
FROM
(
SELECT
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
o_shippriority
FROM
customer,
orders,
lineitem
WHERE
c_mktsegment = 'BUILDING'
AND c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate < date '1995-03-15'
AND l_shipdate > date '1995-03-15'
GROUP BY
l_orderkey,
o_orderdate,
o_shippriority
ORDER BY
revenue DESC,
o_orderdate )
WHERE
rownum <= 10;
对应的SPL脚本:
A |
|
1 |
=now() |
2 |
1995-3-15 |
3 |
>mktsegment="BUILDING" |
4 |
=file("customer.ctx").open().cursor@m(c_custkey;c_mktsegment==mktsegment).fetch().keys@im(c_custkey) |
5 |
=file("orders.ctx").open().cursor@m(o_orderkey,o_orderdate,o_shippriority;o_orderdate<A2 && A4.find(o_custkey)) |
6 |
=file("lineitem.ctx").open().news@r(A5,o_orderkey, sum(l_extendedprice*(1-l_discount)):revenue,o_orderdate,o_shippriority;l_shipdate>A2) |
7 |
=A6.total(top(10;-revenue,o_orderdate)) |
8 |
=interval@ms(A1,now()) |
更多TPC-H的优化实例可以访问:用TPCH练习性能优化。
此外,对于数据分析人员,还推荐阅读SPL CookBook。本书中收集了数百个数据处理中的常见任务及对应的 SPL 代码,涵盖了数据分析人员面对的大部分场景,掌握这些任务的实现方法并加以组合,可以很轻松地应用常规的数据分析处理。
三、冷热混合运算
热数据较小(内存可以装下)时,热数据可以直接从数据库读取,在计算时归并。以TP
C-H的Q1为例:
A |
|
1 |
=now() |
2 |
1995-12-01 |
3 |
=A2-90 |
4 |
=maxkey |
5 |
=connect@l("oracle12c") |
6 |
=A5.query@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4) |
7 |
=file("lineitem.ctx").open() |
8 |
=A7.update@y(A6:) |
9 |
=A7.cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3) |
10 |
=A9.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order) |
11 |
=A10.run(sum_charge+=sum_disc_price) |
12 |
=interval@ms(A1,now()) |
A4的maxkey是LINEITEM组表中的最大L_ORDERKEY
A8中的update@y将新增数据保持在内存,在计算时归并。update@y还支持对删除数据的处理。
SPL还提供了OGG外部库,对于一些无法直接或间接由时间戳字段确定更新内容的表,发生数据变动时,可以基于日志获取数据的更新信息。
英文版