SPL 实践:万亿计算量时空碰撞三分钟搞定
问题描述
时空碰撞定义
数据集A中有n个源对象A1,…,An的时空信息,每条信息有三个属性,分别是ID(iA)、位置(lA)、时间(tA),可以认为同一个Ai在A中不会同时出现两次,即没有两条信息的iA和tA都相同。与A同结构的数据集B中有m个待确认是否与A发生碰撞的目标对象B1,…,Bm(对应属性分别记为iB,lB,tB),也同样认为Bi不会在同一时刻出现两次。
本文涉及很多集合运算,我们不使用术语“记录”表示数据集中的信息,而使用“成员”这个集合相关的术语。
将A按iA分组得到n个子集,仍用A1,…,An命名。对应地,B可以拆成为m个子集B1,..,Bm。对任何a属于Ai和b属于Bj,如果a.lA=b.lB且|a.tA-b.tB|<= 1分钟(该范围可变),则称a和 b发生碰撞,并认定对象Ai和对象Bj有1次碰撞。
规则1: Ai中每个成员最多记1次碰撞,即如果a和b1和b2都有碰撞,只认定Ai和Bj有1次碰撞。
规则2: Bj中发生过碰撞的成员不再重复认定碰撞,即如果a1和a2和b都有碰撞,假定a1.tA<a2.tA,则只认定a1和b发生碰撞, a2和b不再被认定发生碰撞。
目标:对每个Ai,找出相似度最高的前10个对象Bj。
其中相似度r计算公式为:r(Ai,Bj)= E/U。
分子E是上述规则下统计出来的Ai与Bj发生碰撞的总次数。
分母U是Ai与Bj去重成员总数,可以用|Ai|+|Bj|-E’计算,其中E’是Bj中和某Ai成员发生碰撞的成员数量。
数据结构与规模
数据集A
字段名称 |
字段含义 |
数据类型 |
示例数据 |
iA |
ID |
String |
A0001 |
lA |
位置 |
Long |
10001 |
tA |
时间 |
Datetime |
2023-08-28 10:00:00 |
数据集B
字段名称 |
字段含义 |
数据类型 |
示例数据 |
iB |
ID |
String |
B0001 |
lB |
位置 |
Long |
10001 |
tB |
时间 |
Datetime |
2023-08-28 10:00:00 |
tA,tB精确到秒,时间跨度为30天。
A的总记录数为2100万行,日增量约为70万。
B的总记录数为1500万行,日增量约为50万。
n的规模(Ai的数量)为250万,m的规模(Bj的数量)为150万。
位置的数量为10,即lA和lB的取值可能性。
环境和期望
40C128G的服务器上,期望15分钟内计算出结果。
数据量不大,可以全部装入内存数据库,但计算过程比较复杂,单纯用SQL很难写出来,还需要借助外部程序(Java或Python),整体性能很低下,计算任务执行了两个小时以上。
问题分析
n有250万,m有150万,如果按定义计算每对Ai和Bj的相似度,要计算250万*150万=3.75万亿个。就算每个CPU仅用1微秒算一个(实际上这些复杂的集合运算不可能这么快),目前的多CPU环境也要数小时才能完成,显然不能用这种硬遍历的方法。
对于一对Ai和Bj,根据相似度计算公式,若其成员之间没有发生碰撞,则E=0,相似度也是0,也不必再参与后续TopN的计算了。
假设A中数据分布均衡,平均每个Ai的成员数不到10(2100万/250万),而与Ai能发生碰撞的Bj要满足|tA-tB|<=1分钟。假设B中数据分布均衡,则平均每分钟约350条(50万/1440)成员, Ai这10个成员最多和350*2*10=7000个(每个A成员的前后各一分钟之间)B成员发生碰撞,Bj平均成员数也是10(1500万/150万),这7000个B成员分布到的Bj个数平均只有7000/10=700个,也就是说,与Ai相似度不是0的Bj平均只有700个,远小于150万的Bj总数(差2000多倍)。再考虑到lA=lB的条件,如果所有对象的位置也能均衡分布(可能性不大),那与Ai相似度不为0的Bj平均数量还可能再减少10倍(10个位置)。
根据这些信息,我们设计下面的算法:
1. 对于每个Ai,用时间和位置条件找出可能与Ai每个成员发生碰撞的B成员的集合,记为B’,即
B’=Ai.conj(B.select(tB>=tA-60 && tB<=tA+60 && lA==lB))
注意,B’计算过程中将对每个Ai成员过滤出相应的B成员,Ai和B的成员在B’中都可能会有重复。
2. 对每个Ai都要用tB对B过滤计算出B’,如果事先将B按tB排序后,就可以用二分法提速。同时把Ai的tA属性拼上去方便后续计算(lA和lB总是相同,iA对于Ai是定值,都不用拼了),B’可以改为:
B’=Ai.conj(B.select@b(tB>=tA-60 && tB<=tA+60).select(lA==lB).derive(Ai.tA))
3. 对B’按iB分组,得到一组B’j,Bj所有可能与Ai成员发生碰撞的成员都在B’j里,所以Ai和B’j的碰撞次数也就是Ai和Bj的碰撞次数。
B’j中如果出现相同的b,说明有多个a(Ai成员)和b都有碰撞,按规则2只能保留tA最小的a,令
A’j=B’j.group(tB).(~.minp(tA))
即是认定了碰撞的Ai成员和B’j成员对构成的集合(也是认定了碰撞的Ai成员和Bj成员对),其中Ai成员由字段tA标识,Bj成员用字段tB标识。
4. 如果事先把A按iA,tA排序后,分组后的Ai成员也对tA有序,那么B’成员也会对tA有序,进而B’j的成员也对tA有序,这样B’j.group(tB)的每个分组子集中tA最小的一定是第一个成员,这时A’j的计算可以简化为
A’j=B’j.group@1(tB)
5. 根据规则1,Ai中每个成员最多记一次碰撞,按之前约定,同一个Ai在A中不会同时出现两次,那么只要对tA去重即可,即分子
E=A’.icount(tA)
6. 分母U =|Ai|+|Bj|-E’中
A’j是认定了碰撞的Ai成员和Bj成员对集合,而在前面计算中已对Bj成员做了去重处理(group@1(tB)在每个分组子集只取一个成员),也就是说在A’j中,Bj成员不会重复。那么,A’j成员和发生碰撞的Bj成员能一一对应,则有E’=|A’j|。
7. |Ai|和|Bj|可以事先对A和B按iA和iB分组计算出来保存起来,因为数据量不大,都可以保存在内存中。
最后按照公式E/U得到Ai与Bj的相似度r,剩下就是针对该值计算TopN的常规任务了。
进一步优化
8. 上述步骤1中,对每个a找全量数据集B,用了二分法仍然一定的计算量(2*log1500万也要比较的50次)。考虑到分钟数和位置数都不多(30天,每天1440分钟和10个位置也才40几万),内存可以放下,可以用对位序列来直接定位。
计算时,令
G=B.align@a(30*1440,tB\60+1).(~.align@a(10,lB))
因为有时间和位置两个维度,这里也使用了二层的对位序列。先将B按分钟序号tB\60+1分成43200(30*1440)个组,组内按位置序号lB再分成10组。对于Ai中某个成员的tA,可以简单地用
G’=G.m(tA\60)(lA) | G(tA\60+1)(lA) | G.m(tA\60+2)(lA)
迅速粗筛一个B'的小超集,其成员的tB和tA差距在最多不超过2分钟(tA和tB所在的分钟序号相差不大于1),从而过滤掉大量时间维上不可能碰撞的B成员。这个小超集中可能有少量成员的tB与tA的差距大于1分钟,要再对其用
G’.select@b(tB>=tA-60 && tB<=tA+60)
筛出精确的B’,这时候计算量就少了很多。
9. 另外,计算U时,要用iA查找到对应的|Ai|,如果iA是连续整数,也可以直接用位置访问到,避免查找动作。即令
nA=A.groups@n(iA;count(1)).(#2)
这样 |Ai|=nA(iA)。B可以类似处理。
实践过程
准备实验数据
这里直接准备序号化过的数据,设时间跨度为30天,位置的枚举数量为10,模拟数据脚本如下:
A |
|
1 |
=K=30,nA=K*700000,nB=K*500000,t=K*86400,LN=10,rand@s(1) |
2 |
=file("A.ctx").create@yp(#iA,tA,lA).append(nA.new(int(rand(2500000)+1):iA,int(rand(t)):tA,int(rand(LN)+1):lA).sort(iA,tA)) |
3 |
=file("B.ctx").create@y(iB,tB,lB).append(nB.new(int(rand(1500000)+1):iB, int(rand(t)):tB, int(rand(LN)+1):lB).sort(tB,iB)) |
A1中K是天数,nA是数据集A的总数据量,nB是数据集B的总数据量,t是30天的秒数,LN是位置的枚举数量。
A2、A3分别建立A.ctx和B.ctx组表,并将随机生成的数据集A、B的数据导出到组表文件。
其中tA(tB)是从起始时间点开始计算的秒数,例如起始时间点为2023-08-23 00:00:00,则2023-08-23 00:01:01时刻对应的值为61。
A2在建立组表时用了@p选项,表示按第一个字段iA作为分段键。并行计算时需要对组表分段,不能把相同iA的记录分到两段,使用@p选项可以在组表分段时保证这一点。
特别注意,保存A、B时的排序不同,A按iA,tA排序(分析过程的第4步),B按tB,iB排序(分析过程的第2步),后续直接读出有序数据。
计算脚本
A |
|
1 |
=K=30,LN=10 |
2 |
=A=file("A.ctx").open().cursor() |
3 |
=B=file("B.ctx").open().import() |
4 |
=nA=A.groups@n(iA;count(1)).(#2) |
5 |
=nB=B.groups@n(iB;count(1)).(#2) |
6 |
=G=B.align@a(K*1440,tB\60+1).(~.align@a(LN,lB)) |
7 |
=A=file("A.ctx").open().cursor() |
8 |
=A7.group(iA).(~.news((G.m(tA\60)(lA)|gG(tA\60+1)(lA)|G.m(tA\60+2)(lA)).select@b(tB>=tA-60 && tB<=tA+60);iA,iB,tA,tB)) |
9 |
=A8.(~.group@1(iB,tB).group(iB)) |
10 |
=A9.conj(~.new(~.iA,~.iB,~.icount(tA)/(nB(iB)+nA(iA)-~.len()):r).top(-10;r)) |
11 |
=file("result.csv").export@ct(A10) |
A1 K=30,代表统计时间跨度的天数;LN=10,代表位置的枚举数量。
A4、A5分析过程的第7步,分别按数据集A、B的ID分组计数,统计出每个Ai、Bj的成员数,存储成序列
A6分析过程中的第8步,把数据集B按分钟对齐分组,组内再按位置对齐分组,即计算出前面分析中说的对位序列。
A8 分析过程的第1、2、8步,把数据集A按iA分组成若干Ai,循环每个Ai,计算出对应的B’。这里没有用conj而用了news,省去derive动作,结果是等价的。另外,这里要把iA也拼上去以方便后续寻找|Ai|。
A9 分析过程中的第3、4步,计算A’j 。
B’.group(iB).group@1(tB)
简化成
B’.group@1(iB,tB)
A10分析过程的第6、7步, Aj’对tA去重计数的结果就是分子E,即分析过程中的第5步。用前面计算的|Ai|与|Bi|相加减去当前组内记录数(即E’)就是分母U。最后找出r最大的前10条记录。
序号化及还原
将ID、位置、时间序号化(时间的序号化为统计起始时间开始到当前时间的秒数),转换后的数据结构如下
数据集A
字段名称 |
字段含义 |
数据类型 |
示例数据 |
iA |
ID |
Int |
2048986 |
lA |
位置 |
Int |
8 |
tA |
时间 |
Int |
628588 |
数据集B
字段名称 |
字段含义 |
数据类型 |
示例数据 |
iB |
ID |
Int |
1050621 |
lB |
位置 |
Int |
5 |
tB |
时间 |
Int |
1802419 |
以上代码在造数据时,所有字段已按上述结构序号化生成数据。实际使用时,先要做转换整理,完成计算后还要还原。具体方法可以参考数据转存时的整数化中讲到的方法,不是算法的重点,这里不再赘述。
实际效果
SPL使用单机(8C64G),计算总时间跨度30天(数据集A的数据量为2100万,数据集B的数据量为1500万),将全部结果导出至CSV文件,耗时为161秒。
实际上,达到这个性能还会少量使用SPL企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了。
后记
这是个典型的对象统计问题,这类问题一般有如下几个特点:
1. 统计满足某种条件的对象的个数
2. 对象的数量非常多,但每个对象涉及的数据量不多
3. 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来
通常这种问题是把数据按对象排序,但本例的数据量很小,存储上的优化不重要。这里的关键在于需要强大的集合运算特别是有序集合运算的能力,比如数据类型可以支持集合的集合,从而可以保持分组子集而不像SQL那样要强制聚合,还可以支持二层的对位序列,可以用位置访问集合的成员,以及提供有序分组。
英文版