SPL 轻量级多源混算实践 5 - 跨库 Union,比对
相同结构的数据按照年份存储到不同数据库时,要进行数据统计就会涉及多库混合计算。事实上,不管是数据库还是其他任何存储介质,相同结构数据合并都是类似的,只是读取数据这一步有所不同(不同数据源有不同的接口 / 函数)。
配置数据库连接
前面配置了 dba,再配置一个 dbb,连接串信息:
jdbc:mysql://127.0.0.1:3306/bytedbb?useSSL=false&useCursorFetch=true
混算
把两个表的数据合并一起计算。
A |
|
1 |
=connect("dba") |
2 |
=A1.query@x("select * from orders") |
3 |
=connect("dbb") |
4 |
=A3.query@x("select * from orders") |
5 |
=A2|A4 |
6 |
=A5.groups(product_id;sum(total_amount):tamt) |
A2 和 A4 分别查询两个库的 orders 数据,@x 选项表示查询后关闭连接。A5 使用“|”符号合并两部分数据,就这么简单。然后 A6 基于合并结果进行后续计算(这里是分组汇总)。
运行查看结果:
然后发现两个库的数据有重复,需要去重后再计算。
A |
|
1 |
=connect("dba") |
2 |
=A1.query@x("select * from orders") |
3 |
=connect("dbb") |
4 |
=A3.query@x("select * from orders") |
5 |
=A2|A4 |
6 |
=A5.group@1(order_id) |
7 |
=A6.groups(product_id;sum(total_amount):tamt) |
A6 使用 group@1 对 order_id 分组且只保留分组中的第一条记录,这样就去除了重复。如果想根据条件保留记录(比如时间最近的),就可以先排序再 group@1,很灵活。
查看 A6 的结果,发现原来两表中都包含 id 是 10001,10002 等重复数据都去除了,A7 再进行分组汇总。
后续的计算仍然跟单表一样,如果想做其他计算只需要换个计算表达式就可以。
能做混合计算,也就能顺便解决数据比对任务。比如查找两库都有的订单、仅存在一个库的订单等。
A |
|
1 |
=connect("dba") |
2 |
=A1.query@x("select * from orders") |
3 |
=connect("dbb") |
4 |
=A3.query@x("select * from orders") |
5 |
=join@f(A2:a,order_id;A4:b,order_id) |
6 |
=A5.select(a && b) |
7 |
=A5.select(!b).(a) |
8 |
=A5.select(!a).(b) |
9 |
=A5.select(a && b && (${A2.fname().("a."/~/"!=b."/~).concat("||")})) |
这里 A5 使用 join 做全连接,A6 筛选重复订单(交集),A7 和 A8 分别筛选不重复订单(差集),A9 筛选 order_id 重复但其他列不同的记录,这里用到宏来简化书写,${A2.fname().("~(1)."/~/"!=~(2)."/~).concat("||") 展开后是这样:~(1).quantity != ~(2).quantity || ~(1).unit_price != ~(2).unit_price || ~(1).total_amount != ~(2).total_amount || ~(1).order_status != ~(2).order_status
运行后可以看到比对结果:
大数据情况
如果数据量比较大不能把数据全部读入内存,就需要使用 SPL 游标机制完成混合计算。
如果不需要去重,简单把把两个游标合并到一起计算就行:
A |
|
1 |
=connect("dba") |
2 |
=A1.cursor@x("select * from orders") |
3 |
=connect("dbb") |
4 |
=A3.cursor@x("select * from orders") |
5 |
=[A2,A4].conj() |
6 |
=A5.groups(product_id;sum(total_amount):tamt) |
A2 和 A4 使用 cursor 函数查询数据,在 A5 中合并两个游标,在 A6 进行计算。整体跟全内存计算差别不大。
执行脚本, A5 返回的是游标对象,如果想查看里面的内容可以点击“load data”:
如果要先做去重,需要游标保持有序才能方便比较相邻数据。这里要在 SQL 中按 order_id 排序。
A |
|
1 |
=connect("dba") |
2 |
=A1.cursor@x("select * from orders order by order_id") |
3 |
=connect("dbb") |
4 |
=A3.cursor@x("select * from orders order by order_id") |
5 |
=[A2,A4].merge@u(order_id) |
6 |
=A5.groups(product_id;sum(total_amount):tamt) |
归并有序游标需要使用 CS.merge() 函数,merge 提供了很多选项,@u 表示求并集,所以直接就去重了。还有 @i 表示交集,@d 表示差集。后续的计算就都一样了。
Merge 后返回的仍是游标(并不进行实质的计算):
执行到最后的分组汇总才开始计算并返回结果。
整体上大数据情况的计算过程与全内存时基本一致,可以有效降低使用门槛。
大数据的比对也可以做:
A |
B |
|
1 |
=connect("dba") |
|
2 |
=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'") |
|
3 |
=A1.cursor@x("select * from orders order by 1") |
|
4 |
=connect("dbb") |
|
5 |
=A4.cursor@x("select * from orders order by 1") |
|
6 |
=joinx@f(A3:a,order_id;A5:b,order_id) |
|
7 |
=A6.select(a && b) |
=A7.fetch() |
8 |
=A6.select(!b).(a) |
=A8.fetch() |
9 |
=A6.select(!a).(b) |
=A9.fetch() |
10 |
=A5.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")})) |
=A10.fetch() |
由于 A6-A10 返回的都是游标,所以需要在 B7-B10 上增加结果集函数来执行计算并获取结果。
但奇怪的是只有 B7 有结果,B8-B10 都是空的。
这是因为游标是一次性的,一次遍历完就结束了,后面的计算也就没法再进行了。这时就要借助 esProc 提供的游标复用(管道)机制,大数据情况下一次遍历完成多个计算。我们来改造一下代码:
A |
B |
C |
|
1 |
=connect("dba") |
||
2 |
=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'") |
||
3 |
=A1.cursor@x("select * from orders order by 1") |
||
4 |
=connect("dbb") |
||
5 |
=A4.cursor@x("select * from orders order by 1") |
||
6 |
=joinx@f(A3:a,order_id;A5:b,order_id) |
||
7 |
cursor A6 |
=A7.select(a && b) |
=B7.fetch() |
8 |
cursor |
=A8.select(!b).(a) |
=B8.fetch() |
9 |
cursor |
=A9.select(!a).(b) |
=B9.fetch() |
10 |
cursor |
=A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")})) |
=B10.fetch() |
A7-A10 基于 A5 的游标创建管道(A8-A10 是省略写法),剩下 B7-C10 的运算跟上面就完全一样了。运行后我们可以在 A7-A10(注意不是 C7-C10)格看到计算后的结果。
如果计算结果也比较大没法全内存,还可以将结果输出到文件。再改造一下上面的代码:
A |
B |
C |
|
1 |
=connect("dba") |
||
2 |
=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'") |
||
3 |
=A1.cursor@x("select * from orders order by 1") |
||
4 |
=connect("dbb") |
||
5 |
=A4.cursor@x("select * from orders order by 1") |
||
6 |
=joinx@f(A3:a,order_id;A5:b,order_id) |
||
7 |
cursor A6 |
=A7.select(a && b) |
>B7.fetch("intersec.btx") |
8 |
cursor |
=A8.select(!b).(a) |
>B8.fetch("diff_a.btx") |
9 |
cursor |
=A9.select(!a).(b) |
>B9.fetch("diff_b.btx") |
10 |
cursor |
=A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")})) |
>B10.fetch("comp.btx") |
在 C7-C10 的 fetch 中加入输出文件名。
执行后就得到了这几个结果文件。
无论是跨库,还是跨其他数据源,SPL 就都能很容易完成了。