怎样用 esProc 实现多数据库表的数据合并运算

由于业务需要将数据按年存储在两个结构相同的数据库中,要进行数据统计就会涉及多库混合计算。通过数据库或硬编码实现都比较麻烦,借助 esProc 可以简化这类运算。

数据

orders 表结构:

..

其中 order_id 是主键。

数据样例:

10001	16	2024-12-14	116	Product116	9	12.84	115.56	Credit Card	984 Example St, City 2	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08
10002	11	2024-08-18	116	Product116	5	25.71	128.55	PayPal	841 Example St, City 1	Shipped	2025-04-23 01:21:08	2025-04-23 01:21:08
10003	20	2024-08-08	109	Product109	2	13.23	26.46	PayPal	676 Example St, City 4	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08
10127	7	2024-10-12	113	Product113	4	20.64	82.56	Cash	145 Example St, City 4	Delivered	2025-04-23 01:21:08	2025-04-23 01:21:08
10190	19	2024-06-02	110	Product110	7	88.55	619.85	PayPal	289 Example St, City 2	Pending	2025-04-23 01:21:08	2025-04-23 01:21:08

现在要合并计算两个库的数据。用 esProc 怎么做呢?

安装 esProc

先通过https://www.esproc.com/download-esproc/ 下载 esProc 标准版。

安装后,配 MySQL 数据库连接。

先把 MySQL JDBC 驱动包放到 [esProc 安装目录]\common\jdbc 目录下(其他数据库类似)。

..

然后启动 esProc IDE,菜单栏选择 Tool-Connect to Data Source,配置 MySQL 标准 JDBC 连接。

..

同样的方式配置 bytedbb 库的数据源 dbb。测试一下连接,点击 Connect,发现刚刚配置的两个数据源变成粉红色证明连接成功。

..

测试一下,按 ctrl+F9 执行脚本,可以正常查询数据说明配置没问题

..

混算

下面把两个表的数据合并一起计算。


A

1

=connect("dba")

2

=A1.query@x("select * from orders")

3

=connect("dbb")

4

=A3.query@x("select * from orders")

5

=A2|A4

6

=A5.groups(product_id;sum(total_amount):tamt)

A2 和 A4 分别查询两个库的 orders 数据,@x 选项表示查询后关闭连接。A5 使用“|”符号合并两部分数据,就这么简单。然后 A6 基于合并结果进行后续计算(这里是分组汇总)。

点击某个单元格(如合并数据的 A5),可以看到该步骤的计算结果。

..

但是我们发现两个库的数据有重复,需要去重后再计算。


A

1

=connect("dba")

2

=A1.query@x("select * from orders")

3

=connect("dbb")

4

=A3.query@x("select * from orders")

5

=A2|A4

6

=A5.group@1(order_id)

7

=A6.groups(product_id;sum(total_amount):tamt)

A6 使用 group@1 对 order_id 分组且只保留分组中的第一条记录,这样就去除了重复。如果想根据条件保留记录(比如时间最近的),就可以先排序再 group@1,很灵活。

查看 A6 的结果,发现原来两表中都包含 id 是 10001,10002 等重复数据都去除了,A7 再进行分组汇总。

..

后续的计算仍然跟单表一样,如果想做其他计算只需要换个计算表达式就可以。

能做混合计算,也就能顺便解决数据比对任务。比如查找两库都有的订单、仅存在一个库的订单等。


A

1

=connect("dba")

2

=A1.query@x("select * from orders")

3

=connect("dbb")

4

=A3.query@x("select * from orders")

5

=join@f(A2:a,order_id;A4:b,order_id)

6

=A5.select(a && b)

7

=A5.select(!b).(a)

8

=A5.select(!a).(b)

9

=A5.select(a && b && (${A2.fname().("a."/~/"!=b."/~).concat("||")}))

这里 A5 使用 join 做全连接,A6 筛选重复订单(交集),A7 和 A8 分别筛选不重复订单(差集),A9 筛选 order_id 重复但其他列不同的记录,这里用到宏来简化书写,${A2.fname().("~(1)."/~/"!=~(2)."/~).concat("||") 展开后是这样:~(1).quantity != ~(2).quantity || ~(1).unit_price != ~(2).unit_price || ~(1).total_amount != ~(2).total_amount || ~(1).order_status != ~(2).order_status

运行后可以看到比对结果:

..

大数据情况

如果数据量比较大不能把数据全部读入内存,就需要使用 esProc 的游标机制完成混合计算。

如果不需要去重,简单把把两个游标合并到一起计算就行:


A

1

=connect("dba")

2

=A1.cursor@x("select * from orders")

3

=connect("dbb")

4

=A3.cursor@x("select * from orders")

5

=[A2,A4].conj()

6

=A5.groups(product_id;sum(total_amount):tamt)

A2 和 A4 使用 cursor 函数查询数据,在 A5 中合并两个游标,在 A6 进行计算。整体跟全内存计算差别不大。

执行脚本, A5 返回的是游标对象,如果想查看里面的内容可以点击“load data”:

..

如果要先做去重,需要游标保持有序才能方便比较相邻数据。这里要在 SQL 中按 order_id 排序。


A

1

=connect("dba")

2

=A1.cursor@x("select * from orders")

3

=connect("dbb")

4

=A3.cursor@x("select * from orders")

5

=[A2,A4].merge@u(order_id)

6

=A5.groups(product_id;sum(total_amount):tamt)

归并有序游标需要使用 CS.merge() 函数,merge 提供了很多选项,@u 表示求并集,所以直接就去重了。还有 @i 表示交集,@d 表示差集。后续的计算就都一样了。

Merge 后返回的仍是游标(并不进行实质的计算):

..

执行到最后的分组汇总才开始计算并返回结果。

..

整体上大数据情况的计算过程与全内存时基本一致,可以有效降低使用门槛。

大数据的比对也可以做:


A

B

1

=connect("dba")


2

=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")


3

=A1.cursor@x("select * from orders order by 1")


4

=connect("dbb")


5

=A4.cursor@x("select * from orders order by 1")


6

=joinx@f(A3:a,order_id;A5:b,order_id)


7

=A6.select(a && b)

=A7.fetch()

8

=A6.select(!b).(a)

=A8.fetch()

9

=A6.select(!a).(b)

=A9.fetch()

10

=A5.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))

=A10.fetch()

由于 A6-A10 返回的都是游标,所以需要在 B7-B10 上增加结果集函数来执行计算并获取结果。

但奇怪的是只有 B7 有结果,B8-B10 都是空的。

..

这是因为游标是一次性的,一次遍历完就结束了,后面的计算也就没法再进行了。这时就要借助 esProc 提供的游标复用(管道)机制,大数据情况下一次遍历完成多个计算。我们来改造一下代码:


A

B

C

1

=connect("dba")



2

=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")



3

=A1.cursor@x("select * from orders order by 1")



4

=connect("dbb")



5

=A4.cursor@x("select * from orders order by 1")



6

=joinx@f(A3:a,order_id;A5:b,order_id)



7

cursor A6

=A7.select(a && b)

=B7.fetch()

8

cursor

=A8.select(!b).(a)

=B8.fetch()

9

cursor

=A9.select(!a).(b)

=B9.fetch()

10

cursor

=A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))

=B10.fetch()

A7-A10 基于 A5 的游标创建管道(A8-A10 是省略写法),剩下 B7-C10 的运算跟上面就完全一样了。运行后我们可以在 A7-A10(注意不是 C7-C10)格看到计算后的结果。

..

如果计算结果也比较大没法全内存,还可以将结果输出到文件。再改造一下上面的代码:


A

B

C

1

=connect("dba")



2

=A1.query("select column_name from information_schema.columns where table_schema ='bytedba'and table_name ='orders'")



3

=A1.cursor@x("select * from orders order by 1")



4

=connect("dbb")



5

=A4.cursor@x("select * from orders order by 1")



6

=joinx@f(A3:a,order_id;A5:b,order_id)



7

cursor A6

=A7.select(a && b)

>B7.fetch("intersec.btx")

8

cursor

=A8.select(!b).(a)

>B8.fetch("diff_a.btx")

9

cursor

=A9.select(!a).(b)

>B9.fetch("diff_b.btx")

10

cursor

=A10.select(a && b && (${A2.(#1).("a."/~/"!=b."/~).concat("||")}))

>B10.fetch("comp.btx")

在 C7-C10 的 fetch 中加入输出文件名。

..

执行后就得到了这几个结果文件。

..

涉及跨库的混合计算用 esProc 就都能搞定了。

esProc 脚本可以很方便地嵌入到 Java 应用中,部署集成步骤可参考官网的文档。