SPL 实践:向数据库外迁移计算任务

一、数据搬出

TP数据库太撑时,采用SPL来承担AP任务。首先需要将TP数据库的数据搬出来。

关系型数据库通常使用jdbc连接数据库取数。以oracle为例,数据结构参考TPC-H

ORDERS订单表转储为SPL的高性能组表文件,脚本如下:


A

1

=connect@l("oracle12c")

2

=A1.cursor@x("O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS ORDER BY 1")

3

=file("orders.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment)

4

>A3.append@i(A2)

jdbc取数很慢,可以利用多核并行取数的办法提速。LINEITEM订单明细表数据量较大,按以上脚本的做法,耗时较长。

可以按L_ORDERKEY除以n的余数[0,..,n-1],将数据分成n份,每一份对应一个线程取数并转储为一个集文件,最后再把这n个集文件数据归并成组表文件,脚本如下:


A

B

1

fork 4.()

=connect@l("oracle12c")

2


=B1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE MOD(L_ORDERKEY,4)="/(A1-1)/"ORDER BY 1,2")

3


=file("lineitem"/A1/".btx").export@b(B2)

4

=directory("lineitem?.btx")

5

=A4.(file(~).cursor@b()).merge(#1,#2)

6

=file("lineitem.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)

7

>A6.append@i(A5)

8

=A4.(movefile(~))

A1fork n.()n4,是并行数

当数据库中数据量很大,数据库接近饱和,再在SQL中执行ORDER BY,将库内的大量数据排序后再取出,可能会出现数据库排序时卡住的现象。为避免这种情况,可以使用SPL的游标排序(cs.sortx)。执行时,可以观察sortx是否持续产生临时文件,依此判断程序是否正常执行。

对于增量数据,例如每天固定时间,根据时间戳字段获取增量数据。

假设ORDERS表的O_ORDERDATE为时间戳字段且O_ORDERKEY是递增的,而LINEITEM表通过L_ORDERKEY外键连接ORDERS表的O_ORDERKEY主键,获取增量数据后与冷数据归并的脚本如下:


A

1

=connect@l("oracle12c")

2

=st

3

=now()

4

=maxkey

5

=A1.cursor("SELECT O_ORDERKEY,O_CUSTKEY,O_ORDERSTATUS,O_TOTALPRICE,O_ORDERDATE,O_ORDERPRIORITY,O_CLERK,O_SHIPPRIORITY,O_COMMENT FROM ORDERS WHERE O_ORDERDATE>? AND O_ORDERDATE<=? ORDER BY 1",A2,A3)

6

=A1.cursor@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4)

7

=file("orders.ctx").open().cursor()

8

=[A5,A7].merge(#1)

9

=file("orders_new.ctx").create(#o_orderkey,o_custkey,o_orderstatus,o_totalprice,o_orderdate,o_orderpriority,o_clerk,o_shippriority,o_comment)

10

>A9.append@i(A8)

11

>movefile("orders_new.ctx","orders.ctx")

12

=file("lineitem.ctx").open().cursor()

13

=[A6,A12].merge(#1,#2)

14

=file("lineitem_new.ctx").create(#l_orderkey,#l_linenumber,l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment)

15

>A14.append@i(A13)

16

>movefile("lineitem_new.ctx","lineitem.ctx")

A2stORDERS组表中的最大O_ORDERDATE

A3now()函数为系统当前时间

A4maxkeyORDERS组表中的最大O_ORDERKEY

二、常规冷数据运算

转储完成后,就可以基于SPL的组表文件进行高性能计算,包括但不限于:条件过滤、分组聚合、表间关联等。常规计算可参考:SPL文件上有的常规运算,这里仅举两例。以TPCHQ1Q3为例:


1、小结果集的常规分组汇总

SQL举例:

SELECT
	l_returnflag,
	l_linestatus,
	sum(l_quantity) AS sum_qty,
	sum(l_extendedprice) AS sum_base_price,
	sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
	sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
	avg(l_quantity) AS avg_qty,
	avg(l_extendedprice) AS avg_price,
	avg(l_discount) AS avg_disc,
	count(*) AS count_order
FROM
	lineitem
WHERE
	l_shipdate <= date '1995-12-01' - INTERVAL '90' DAY(3)
GROUP BY
	l_returnflag,
	l_linestatus
ORDER BY
	l_returnflag,
	l_linestatus;

对应的SPL脚本:


A

1

=now()

2

1995-12-01

3

=A2-90

4

=file("lineitem.ctx").open().cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3)

5

=A4.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order)

6

=A5.run(sum_charge+=sum_disc_price)

7

=interval@ms(A1,now())

2、主子表关联后进行分组统计

SQL举例:

SELECT
	*
FROM
	(
	SELECT
		l_orderkey,
		sum(l_extendedprice * (1 - l_discount)) AS revenue,
		o_orderdate,
		o_shippriority
	FROM
		customer,
		orders,
		lineitem
	WHERE
		c_mktsegment = 'BUILDING'
		AND c_custkey = o_custkey
		AND l_orderkey = o_orderkey
		AND o_orderdate < date '1995-03-15'
		AND l_shipdate > date '1995-03-15'
	GROUP BY
		l_orderkey,
		o_orderdate,
		o_shippriority
	ORDER BY
		revenue DESC,
		o_orderdate )
WHERE
	rownum <= 10;

对应的SPL脚本:


A

1

=now()

2

1995-3-15

3

>mktsegment="BUILDING"

4

=file("customer.ctx").open().cursor@m(c_custkey;c_mktsegment==mktsegment).fetch().keys@im(c_custkey)

5

=file("orders.ctx").open().cursor@m(o_orderkey,o_orderdate,o_shippriority;o_orderdate<A2 && A4.find(o_custkey))

6

=file("lineitem.ctx").open().news@r(A5,o_orderkey, sum(l_extendedprice*(1-l_discount)):revenue,o_orderdate,o_shippriority;l_shipdate>A2)

7

=A6.total(top(10;-revenue,o_orderdate))

8

=interval@ms(A1,now())

更多TPC-H的优化实例可以访问:TPCH练习性能优化

此外,对于数据分析人员,还推荐阅读SPL CookBook本书中收集了数百个数据处理中的常见任务及对应的 SPL 代码,涵盖了数据分析人员面对的大部分场景,掌握这些任务的实现方法并加以组合,可以很轻松地应用常规的数据分析处理

三、冷热混合运算

热数据较小(内存可以装下)时,热数据可以直接从数据库读取,在计算时归并。以TP
C-HQ1为例:


A

1

=now()

2

1995-12-01

3

=A2-90

4

=maxkey

5

=connect@l("oracle12c")

6

=A5.query@x("SELECT L_ORDERKEY,L_LINENUMBER,L_PARTKEY,L_SUPPKEY,L_QUANTITY,L_EXTENDEDPRICE,L_DISCOUNT,L_TAX,L_RETURNFLAG,L_LINESTATUS,L_SHIPDATE,L_COMMITDATE,L_RECEIPTDATE,L_SHIPINSTRUCT,L_SHIPMODE,L_COMMENT FROM LINEITEM WHERE L_ORDERKEY>? ORDER BY 1,2",A4)

7

=file("lineitem.ctx").open()

8

=A7.update@y(A6:)

9

=A7.cursor@m(l_shipdate,l_quantity, l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus;l_shipdate<=A3)

10

=A9.groups(l_returnflag, l_linestatus; sum(l_quantity):sum_qty, sum(l_extendedprice):sum_base_price, sum(dp=l_extendedprice*(1-l_discount)):sum_disc_price, sum(dp*l_tax):sum_charge, avg(l_quantity):avg_qty, avg(l_extendedprice):avg_price, avg(l_discount):avg_disc, count(1):count_order)

11

=A10.run(sum_charge+=sum_disc_price)

12

=interval@ms(A1,now())

A4maxkeyLINEITEM组表中的最大L_ORDERKEY

A8中的update@y将新增数据保持在内存,在计算时归并。update@y还支持对删除数据的处理。

SPL还提供了OGG外部库,对于一些无法直接或间接由时间戳字段确定更新内容的表,发生数据变动时,可以基于日志获取数据的更新信息。