怎样用 esProc 实现多数据库表的数据合并运算
由于业务需要将数据按年存储在两个结构相同的数据库中,要进行数据统计就会涉及多库混合计算。通过数据库或硬编码实现都比较麻烦,借助 esProc 可以简化这类运算。
数据
orders 表结构:

其中 order_id 是主键。
数据样例:
10001	16	2024-12-14	116	Product116	9	12.84	115.56	Credit Card	984 Example St, City 2	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08
10002	11	2024-08-18	116	Product116	5	25.71	128.55	PayPal	841 Example St, City 1	Shipped	2025-04-23 01:21:08	2025-04-23 01:21:08
10003	20	2024-08-08	109	Product109	2	13.23	26.46	PayPal	676 Example St, City 4	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08
10127	7	2024-10-12	113	Product113	4	20.64	82.56	Cash	145 Example St, City 4	Delivered	2025-04-23 01:21:08	2025-04-23 01:21:08
10190	19	2024-06-02	110	Product110	7	88.55	619.85	PayPal	289 Example St, City 2	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08
 
 
现在要合并计算两个库的数据。用 esProc 怎么做呢?
安装 esProc
先通过https://www.esproc.com/download-esproc/ 下载 esProc 标准版。
安装后,配 MySQL 数据库连接。
先把 MySQL JDBC 驱动包放到 [esProc 安装目录]\common\jdbc 目录下(其他数据库类似)。

然后启动 esProc IDE,菜单栏选择 Tool-Connect to Data Source,配置 MySQL 标准 JDBC 连接。

同样的方式配置 bytedbb 库的数据源 dbb。测试一下连接,点击 Connect,发现刚刚配置的两个数据源变成粉红色证明连接成功。

测试一下,按 ctrl+F9 执行脚本,可以正常查询数据说明配置没问题

混算
下面把两个表的数据合并一起计算。
A  |  
  |
1  |  
   =connect("dba")  |  
  
2  |  
   =A1.query@x("select * from orders")  |  
  
3  |  
   =connect("dbb")  |  
  
4  |  
   =A3.query@x("select * from orders")  |  
  
5  |  
   =A2|A4  |  
  
6  |  
   =A5.groups(product_id;sum(total_amount):tamt)  |  
  
A2 和 A4 分别查询两个库的 orders 数据,@x 选项表示查询后关闭连接。A5 使用“|”符号合并两部分数据,就这么简单。然后 A6 基于合并结果进行后续计算(这里是分组汇总)。
点击某个单元格(如合并数据的 A5),可以看到该步骤的计算结果。

但是我们发现两个库的数据有重复,需要去重后再计算。
A  |  
  |
1  |  
   =connect("dba")  |  
  
2  |  
   =A1.query@x("select * from orders")  |  
  
3  |  
   =connect("dbb")  |  
  
4  |  
   =A3.query@x("select * from orders")  |  
  
5  |  
   =A2|A4  |  
  
6  |  
   =A5.group@1(order_id)  |  
  
7  |  
   =A6.groups(product_id;sum(total_amount):tamt)  |  
  
A6 使用 group@1 对 order_id 分组且只保留分组中的第一条记录,这样就去除了重复。如果想根据条件保留记录(比如时间最近的),就可以先排序再 group@1,很灵活。
查看 A6 的结果,发现原来两表中都包含 id 是 10001,10002 等重复数据都去除了,A7 再进行分组汇总。

后续的计算仍然跟单表一样,如果想做其他计算只需要换个计算表达式就可以。
能做混合计算,也就能顺便解决数据比对任务。比如查找两库都有的订单、仅存在一个库的订单等。
A  |  
  |
1  |  
   =connect("dba")  |  
  
2  |  
   =A1.query@x("select * from orders")  |  
  
3  |  
   =connect("dbb")  |  
  
4  |  
   =A3.query@x("select * from orders")  |  
  
5  |  
   =join@f(A2:a,order_id;A4:b,order_id)  |  
  
6  |  
   =A5.select(a && b)  |  
  
7  |  
   =A5.select(!b).(a)  |  
  
8  |  
   =A5.select(!a).(b)  |  
  
9  |  
   =A5.select(a && b && (${A2.fname().("a."/~/"!=b."/~).concat("||")}))  |  
  
这里 A5 使用 join 做全连接,A6 筛选重复订单(交集),A7 和 A8 分别筛选不重复订单(差集),A9 筛选 order_id 重复但其他列不同的记录,这里用到宏来简化书写,${A2.fname().("~(1)."/~/"!=~(2)."/~).concat("||") 展开后是这样:~(1).quantity != ~(2).quantity || ~(1).unit_price != ~(2).unit_price || ~(1).total_amount != ~(2).total_amount || ~(1).order_status != ~(2).order_status
运行后可以看到比对结果:

大数据情况
如果数据量比较大不能把数据全部读入内存,就需要使用 esProc 的游标机制完成混合计算。
如果不需要去重,简单把把两个游标合并到一起计算就行:
A  |  
  |
1  |  
   =connect("dba")  |  
  
2  |  
   =A1.cursor@x("select * from orders")  |  
  
3  |  
   =connect("dbb")  |  
  
4  |  
   =A3.cursor@x("select * from orders")  |  
  
5  |  
   =[A2,A4].conj()  |  
  
6  |  
   =A5.groups(product_id;sum(total_amount):tamt)  |  
  
A2 和 A4 使用 cursor 函数查询数据,在 A5 中合并两个游标,在 A6 进行计算。整体跟全内存计算差别不大。
执行脚本, A5 返回的是游标对象,如果想查看里面的内容可以点击“load data”:

如果要先做去重,需要游标保持有序才能方便比较相邻数据。这里要在 SQL 中按 order_id 排序。
A  |  
  |
1  |  
   =connect("dba")  |  
  
2  |  
   =A1.cursor@x("select * from orders order by order_id")  |  
  
3  |  
   =connect("dbb")  |  
  
4  |  
   =A3.cursor@x("select * from orders order by order_id")  |  
  
5  |  
   =[A2,A4].merge@u(order_id)  |  
  
6  |  
   =A5.groups(product_id;sum(total_amount):tamt)  |  
  
归并有序游标需要使用 CS.merge() 函数,merge 提供了很多选项,@u 表示求并集,所以直接就去重了。还有 @i 表示交集,@d 表示差集。后续的计算就都一样了。
Merge 后返回的仍是游标(并不进行实质的计算):

执行到最后的分组汇总才开始计算并返回结果。

整体上大数据情况的计算过程与全内存时基本一致,可以有效降低使用门槛。
大数据的比对也可以做:
A  |  
   B  |  
  |
1  |  
   =connect("dba")  |  
   |
2  |  
   =A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")  |  
   |
3  |  
   =A1.cursor@x("select * from orders order by 1")  |  
   |
4  |  
   =connect("dbb")  |  
   |
5  |  
   =A4.cursor@x("select * from orders order by 1")  |  
   |
6  |  
   =joinx@f(A3:a,order_id;A5:b,order_id)  |  
   |
7  |  
   =A6.select(a && b)  |  
   =A7.fetch()  |  
  
8  |  
   =A6.select(!b).(a)  |  
   =A8.fetch()  |  
  
9  |  
   =A6.select(!a).(b)  |  
   =A9.fetch()  |  
  
10  |  
   =A5.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))  |  
   =A10.fetch()  |  
  
由于 A6-A10 返回的都是游标,所以需要在 B7-B10 上增加结果集函数来执行计算并获取结果。
但奇怪的是只有 B7 有结果,B8-B10 都是空的。

这是因为游标是一次性的,一次遍历完就结束了,后面的计算也就没法再进行了。这时就要借助 esProc 提供的游标复用(管道)机制,大数据情况下一次遍历完成多个计算。我们来改造一下代码:
A  |  
   B  |  
   C  |  
  |
1  |  
   =connect("dba")  |  
   ||
2  |  
   =A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")  |  
   ||
3  |  
   =A1.cursor@x("select * from orders order by 1")  |  
   ||
4  |  
   =connect("dbb")  |  
   ||
5  |  
   =A4.cursor@x("select * from orders order by 1")  |  
   ||
6  |  
   =joinx@f(A3:a,order_id;A5:b,order_id)  |  
   ||
7  |  
   cursor A6  |  
   =A7.select(a && b)  |  
   =B7.fetch()  |  
  
8  |  
   cursor  |  
   =A8.select(!b).(a)  |  
   =B8.fetch()  |  
  
9  |  
   cursor  |  
   =A9.select(!a).(b)  |  
   =B9.fetch()  |  
  
10  |  
   cursor  |  
   =A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))  |  
   =B10.fetch()  |  
  
A7-A10 基于 A5 的游标创建管道(A8-A10 是省略写法),剩下 B7-C10 的运算跟上面就完全一样了。运行后我们可以在 A7-A10(注意不是 C7-C10)格看到计算后的结果。

如果计算结果也比较大没法全内存,还可以将结果输出到文件。再改造一下上面的代码:
A  |  
   B  |  
   C  |  
  |
1  |  
   =connect("dba")  |  
   ||
2  |  
   =A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")  |  
   ||
3  |  
   =A1.cursor@x("select * from orders order by 1")  |  
   ||
4  |  
   =connect("dbb")  |  
   ||
5  |  
   =A4.cursor@x("select * from orders order by 1")  |  
   ||
6  |  
   =joinx@f(A3:a,order_id;A5:b,order_id)  |  
   ||
7  |  
   cursor A6  |  
   =A7.select(a && b)  |  
   >B7.fetch("intersec.btx")  |  
  
8  |  
   cursor  |  
   =A8.select(!b).(a)  |  
   >B8.fetch("diff_a.btx")  |  
  
9  |  
   cursor  |  
   =A9.select(!a).(b)  |  
   >B9.fetch("diff_b.btx")  |  
  
10  |  
   cursor  |  
   =A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))  |  
   >B10.fetch("comp.btx")  |  
  
在 C7-C10 的 fetch 中加入输出文件名。

执行后就得到了这几个结果文件。

涉及跨库的混合计算用 esProc 就都能搞定了。
esProc 脚本可以很方便地嵌入到 Java 应用中,部署集成步骤可参考官网的文档。
            
        

英文版