SPL 实践:万亿计算量时空碰撞三分钟搞定

问题描述

时空碰撞定义

数据集A中有n个源对象A1,…,An的时空信息,每条信息有三个属性,分别是IDiA位置(lA时间(tA),可以认为同一个AiA中不会同时出现两次,即没有两条信息的iAtA都相同。与A同结构的数据集B中有m个待确认是否与A发生碰撞的目标对象B1,…,Bm(对应属性分别记为iB,lB,tB),也同样认为Bi不会在同一时刻出现两次。

本文涉及很多集合运算,我们不使用术语“记录”表示数据集中的信息,而使用“成员”这个集合相关的术语。

AiA分组得到n个子集,仍用A1,…,An命名。对应地,B可以拆成为m个子集B1,..,Bm。对任何a属于Aib属于Bj,如果a.lA=b.lB|a.tA-b.tB|<= 1分钟(该范围可变),则称a b发生碰撞,并认定对象Ai和对象Bj1次碰撞。

规则1: Ai中每个成员最多记1次碰撞,即如果ab1b2都有碰撞,只认定AiBj1次碰撞

规则2: Bj中发生过碰撞的成员不再重复认定碰撞,即如果a1a2b都有碰撞,假定a1.tA<a2.tA,则只认定a1b发生碰撞, a2b不再被认定发生碰撞

目标:对每个Ai,找出相似度最高的前10个对象Bj

其中相似度r计算公式为:r(Ai,Bj)= E/U

分子E是上述规则下统计出来的AiBj发生碰撞的总次数。

分母UAiBj去重成员总数,可以用|Ai|+|Bj|-E’计算,其中E’Bj中和某Ai成员发生碰撞的成员数量。

数据结构与规模

数据集A

字段名称

字段含义

数据类型

示例数据

iA

ID

String

A0001

lA

位置

Long

10001

tA

时间

Datetime

2023-08-28 10:00:00

数据集B

字段名称

字段含义

数据类型

示例数据

iB

ID

String

B0001

lB

位置

Long

10001

tB

时间

Datetime

2023-08-28 10:00:00

tA,tB精确到秒,时间跨度为30天。

A的总记录数为2100万行,日增量约为70万。

B的总记录数为1500万行,日增量约为50万。

n的规模(Ai的数量)为250万,m的规模(Bj的数量)为150万。

位置的数量为10,即lAlB的取值可能性。

环境和期望

40C128G的服务器上,期望15分钟内计算出结果。

数据量不大,可以全部装入内存数据库,但计算过程比较复杂,单纯用SQL很难写出来,还需要借助外部程序(JavaPython),整体性能很低下,计算任务执行了两个小时以上。

问题分析

n250万,m150万,如果按定义计算每对AiBj的相似度,要计算250*150=3.75万亿个。就算每个CPU仅用1微秒算一个(实际上这些复杂的集合运算不可能这么快),目前的多CPU环境也要数小时才能完成,显然不能用这种硬遍历的方法。

对于一对AiBj,根据相似度计算公式,若其成员之间没有发生碰撞,则E=0,相似度也是0,也不必再参与后续TopN的计算了。

假设A中数据分布均衡,平均每个Ai的成员数不到102100/250万),而与Ai能发生碰撞的Bj要满足|tA-tB|<=1分钟。假设B中数据分布均衡,则平均每分钟约350条(50/1440)成员, Ai10个成员最多和350*2*10=7000个(每个A成员的前后各一分钟之间)B成员发生碰撞,Bj平均成员数也是101500/150万),这7000B成员分布到的Bj个数平均只有7000/10=700个,也就是说,与Ai相似度不是0Bj平均只有700个,远小于150万的Bj总数(差2000多倍)。再考虑到lA=lB的条件,如果所有对象的位置也能均衡分布(可能性不大),那与Ai相似度不为0Bj平均数量还可能再减少10倍(10个位置)。

根据这些信息,我们设计下面的算法:

1. 对于每个Ai,用时间和位置条件找出可能与Ai每个成员发生碰撞的B成员的集合,记为B’,即

B’=Ai.conj(B.select(tB>=tA-60 && tB<=tA+60 && lA==lB))

注意,B’计算过程中将对每个Ai成员过滤出相应的B成员,AiB的成员在B’中都可能会有重复。

2. 对每个Ai都要用tBB过滤计算出B’,如果事先将BtB排序后,就可以用二分法提速。同时把AitA属性拼上去方便后续计算(lAlB总是相同,iA对于Ai是定值,都不用拼了),B’可以改为:

B’=Ai.conj(B.select@b(tB>=tA-60 && tB<=tA+60).select(lA==lB).derive(Ai.tA))

3. B’iB分组,得到一组B’jBj所有可能与Ai成员发生碰撞的成员都在B’j里,所以AiB’j的碰撞次数也就是AiBj的碰撞次数。

B’j中如果出现相同的b,说明有多个aAi成员)和b都有碰撞,按规则2只能保留tA最小的a,令

A’j=B’j.group(tB).(~.minp(tA))

即是认定了碰撞的Ai成员和B’j成员对构成的集合(也是认定了碰撞的Ai成员和Bj成员对),其中Ai成员由字段tA标识,Bj成员用字段tB标识。

4. 如果事先把AiA,tA排序后,分组后的Ai成员也对tA有序,那么B’成员也会对tA有序,进而B’j的成员也对tA有序,这样B’j.group(tB)的每个分组子集中tA最小的一定是第一个成员,这时A’j的计算可以简化为

A’j=B’j.group@1(tB)

5. 根据规则1Ai中每个成员最多记一次碰撞,按之前约定,同一个AiA中不会同时出现两次,那么只要tA去重即可,即分子

E=A’.icount(tA)

6. 分母U =|Ai|+|Bj|-E’

A’j是认定了碰撞的Ai成员和Bj成员对集合,而在前面计算中已对Bj成员做了去重处理(group@1(tB)在每个分组子集只取一个成员),也就是说在A’j中,Bj成员不会重复。那么,A’j成员和发生碰撞的Bj成员能一一对应,则有E’=|A’j|

7. |Ai||Bj|可以事先对ABiAiB分组计算出来保存起来,因为数据量不大,都可以保存在内存中。

最后按照公式E/U得到AiBj的相似度r,剩下就是针对该值计算TopN的常规任务了。

进一步优化

8. 上述步骤1中,对每个a找全量数据集B,用了二分法仍然一定的计算量(2*log1500万也要比较的50次)。考虑到分钟数和位置数都不多(30天,每天1440分钟和10个位置也才40几万),内存可以放下,可以用对位序列来直接定位。

计算时,令

G=B.align@a(30*1440,tB\60+1).(~.align@a(10,lB))

因为有时间和位置两个维度,这里也使用了二层的对位序列。先将B按分钟序号tB\60+1分成4320030*1440)个组,组内按位置序号lB再分成10组。对于Ai中某个成员的tA,可以简单地用

G’=G.m(tA\60)(lA) | G(tA\60+1)(lA) | G.m(tA\60+2)(lA)

迅速粗筛一个B'的小超集,其成员的tBtA差距在最多不超过2分钟(tAtB所在的分钟序号相差不大于1),从而过滤掉大量时间维上不可能碰撞的B成员。这个小超集中可能有少量成员的tBtA的差距大于1分钟,要再对其用

G’.select@b(tB>=tA-60 && tB<=tA+60)

筛出精确的B’,这时候计算量就少了很多。

9. 另外,计算U时,要用iA查找到对应的|Ai|,如果iA是连续整数,也可以直接用位置访问到,避免查找动作。即令

nA=A.groups@n(iA;count(1)).(#2)

这样 |Ai|=nA(iA)B可以类似处理。

实践过程

准备实验数据

这里直接准备序号化过的数据,设时间跨度为30天,位置的枚举数量为10,模拟数据脚本如下:


A

1

=K=30,nA=K*700000,nB=K*500000,t=K*86400,LN=10,rand@s(1)

2

=file("A.ctx").create@yp(#iA,tA,lA).append(nA.new(int(rand(2500000)+1):iA,int(rand(t)):tA,int(rand(LN)+1):lA).sort(iA,tA))

3

=file("B.ctx").create@y(iB,tB,lB).append(nB.new(int(rand(1500000)+1):iB, int(rand(t)):tB, int(rand(LN)+1):lB).sort(tB,iB))

A1K是天数,nA是数据集A的总数据量,nB是数据集B的总数据量,t30天的秒数,LN是位置的枚举数量。

A2A3分别建立A.ctxB.ctx组表,并将随机生成的数据集AB的数据导出到组表文件。

其中tAtB)是从起始时间点开始计算的秒数,例如起始时间点为2023-08-23 00:00:00,则2023-08-23 00:01:01时刻对应的值为61

A2建立组表时用了@p选项,表示按第一个字段iA作为分段键。并行计算时需要对组表分段,不能把相同iA的记录分到两段,使用@p选项可以在组表分段时保证这一点

特别注意,保存AB时的排序不同,AiA,tA排序(分析过程的第4步),BtB,iB排序(分析过程的第2步),后续直接读出有序数据。

计算脚本


A

1

=K=30,LN=10

2

=A=file("A.ctx").open().cursor()

3

=B=file("B.ctx").open().import()

4

=nA=A.groups@n(iA;count(1)).(#2)

5

=nB=B.groups@n(iB;count(1)).(#2)

6

=G=B.align@a(K*1440,tB\60+1).(~.align@a(LN,lB))

7

=A=file("A.ctx").open().cursor()

8

=A7.group(iA).(~.news((G.m(tA\60)(lA)|gG(tA\60+1)(lA)|G.m(tA\60+2)(lA)).select@b(tB>=tA-60 && tB<=tA+60);iA,iB,tA,tB))

9

=A8.(~.group@1(iB,tB).group(iB))

10

=A9.conj(~.new(~.iA,~.iB,~.icount(tA)/(nB(iB)+nA(iA)-~.len()):r).top(-10;r))

11

=file("result.csv").export@ct(A10)

A1 K=30,代表统计时间跨度的天数;LN=10,代表位置的枚举数量。

A4A5分析过程的第7步,分别按数据集ABID分组计数,统计出每个AiBj的成员数,存储成序列

A6分析过程中的第8步,把数据集B按分钟对齐分组,组内再按位置对齐分组,即计算出前面分析中说的对位序列。

A8 分析过程的第128步,把数据集AiA分组成若干Ai,循环每个Ai,计算出对应的B’。这里没有用conj而用了news,省去derive动作,结果是等价的。另外,这里要把iA也拼上去以方便后续寻找|Ai|

A9 分析过程中的第34步,计算A’j

B’.group(iB).group@1(tB)

简化成

B’.group@1(iB,tB)

A10分析过程的第67步, Aj’tA去重计数的结果就是分子E,即分析过程中的第5步。用前面计算的|Ai||Bi|相加减去当前组内记录数(即E’)就是分母U。最后找出r最大的前10条记录。

序号化及还原

ID、位置、时间序号化(时间的序号化为统计起始时间开始到当前时间的秒数),转换后的数据结构如下

数据集A

字段名称

字段含义

数据类型

示例数据

iA

ID

Int

2048986

lA

位置

Int

8

tA

时间

Int

628588

数据集B

字段名称

字段含义

数据类型

示例数据

iB

ID

Int

1050621

lB

位置

Int

5

tB

时间

Int

1802419

以上代码在造数据时,所有字段已按上述结构序号化生成数据。实际使用时,先要做转换整理,完成计算后还要还原。具体方法可以参考数据转存时的整数化中讲到的方法,不是算法的重点,这里不再赘述。

实际效果

SPL使用单机(8C64G),计算总时间跨度30天(数据集A的数据量为2100万,数据集B的数据量为1500万),将全部结果导出至CSV文件,耗时为161秒。

实际上,达到这个性能还会少量使用SPL企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了

后记

这是个典型的对象统计问题,这类问题一般有如下几个特点:

1. 统计满足某种条件的对象的个数

2. 对象的数量非常多,但每个对象涉及的数据量不多

3. 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来

通常这种问题是把数据按对象排序,但本例的数据量很小,存储上的优化不重要。这里的关键在于需要强大的集合运算特别是有序集合运算的能力,比如数据类型可以支持集合的集合,从而可以保持分组子集而不像SQL那样要强制聚合,还可以支持二层的对位序列,可以用位置访问集合的成员,以及提供有序分组。