SPL 实践:搞死 MPP 的时空碰撞问题

问题描述

时空碰撞定义

某时间区间(例如7天)被分成多个固定时长(如15分钟)的时间切片,对象a和对象b在同一时间切片内的相同位置出现过,称为一次碰撞。

规则1:相同时间切片内,多次碰撞只记一次。

规则2:相同时间切片内,最后出现位置不同的称为不匹配,不匹配的时间切片数量不超过20 时,(包括其它时间切片的)碰撞才被认为有效。

要求:已知对象a,查找出指定时间区间内,满足两条规则下,与a发生有效碰撞次数最多的前20个对象b

数据结构与规模

单一数据表,每天的数据量约80亿条记录,每个对象平均1000条记录,每条记录存储对象的时空信息(对象标识、时间戳、空间标记),当时间区间为7天时,总数据量有560亿行,数据结构如下:

字段名称

字段类型

字段注释

示例数据

no

String

唯一对象标识

100000000009

ct

Int

时间戳,精确到秒

1690819200

lac

String

空间标记1

40000

ci

String

空间标记2

66000000

no由全数字构成。lacci总是一起出现,为了描述方便起见,我们可以把lacci并称为一个字段loc,已知loc去重计数的范围不超过27万。

环境和期望

564C256G服务器构成的集群下期望1小时内计算出结果,使用某世界知名MPP数据库无法达到预期。

问题分析

这个问题用关系数据库确实不容易快速计算,我们尝试用SQL写出不考虑规则2的运算:

WITH DT AS ( SELECT DISTINCT no, loc, int(ct/15分钟) as ct FROM T )
SELECT TOP 20 * FROM
   ( SELECT B.no, COUNT(DISTINCT B.loc) cnt
   FROM DT AS A JOIN DT AS B ON A.loc=B.loc AND A.ct=B.ct
   WHERE A.no=a AND B.no<>a
   GROUP BY B.no)
ORDER BY cnt DESC

SQL中的DISTINCTJOIN计算会涉及HASH和比对,数据量很大时计算量也会很大,都会严重拖累性能。而且这些运算都涉及随机访问,通常要在内存进行,数据量太大还要使用缓存,性能更会急剧下降甚至可能溢出。仅是规则1SQL计算已经很慢了,再加上规则2MPP算不出来也不奇怪了。

如果把对象ab在时间区间内的相关记录都取出成内存中的集合,然后来统计ab发生有效碰撞的次数,并不会很困难。每个对象涉及的记录数并不多,即使7天区间也不到1万条,内存放下毫无压力。

a的记录集合是Ab的是B,将A按时间切片分组为A1,…,AnB分为B1,…Bn。所有Ai,Bi内成员都按ct从小到大排序。

时间切片i内,a,b发生(不考虑两条规则时的)碰撞的次数,可用

Ci=Bi.count(Ai.(loc).contain(loc))

计算出来,即统计Bi中有多少locAi中出现过。

不过,这种两层循环计算会较慢,而我们知道a以及Ai相对于b是确定的,这样可以事先对Ai中的loc去重后建索引,改为

Ai’=Ai.id(loc).key@i(loc)
Ci=Bi.switch@i(loc,Ai’).len()

switch@i过滤掉在Ai中找不到locBi成员,同样可以得到碰撞次数。

我们只要统计Ci>0的时间切片个数即可得到满足规则1的碰撞次数。

类似地,可用

Di=Ai.m(-1).loc!=Bi.m(-1).loc //m(-1)表示取集合的最后成员

判断出在时间切片ia,b是否发生过不匹配。

有了Ci,Dia,b 的有效碰撞次数就很容易计算了

if(count(Di)<=20,count(Ci>0))

剩下就是针对该值计算TopN的常规任务了。

如果数据对no,ct有序,也很容易实现这个思路。A可以用二分法一把取出,然后从头遍历对象b,因为数据有序,每次取出对应的B很容易。AB都对ct有序时,可以用有序分组计算出Ai,Bi,且保证上述m(-1)的正确性。

可惜关系数据库无法保证数据有序存储,也没有相关的有序计算方法,只能写出非常绕的嵌套SQL

SPL有这种有序存储和相关的计算机制,容易实现。

基于这个思路,还有一些工程上的优化手段。

数据转换

no变成数,两个位置lacci合并成一个loc,并且序号化(原来是字符串,数字化时就顺便处理为序号了)。

转换后的数据结构如下

字段名称

数据类型

字段含义

示例数据

no

Long

唯一对象标识

100000000009

ct

Int

时间戳,精确到秒

1690819200

loc

Int

空间标记

10282

相比原数据结构,转存时做了以下两点变动:

1、将lacci两个字段合并为loc字段,并转换成Int型序号。原lacci作为维表单独存储。

2、将数字串no的数据类型变为Long型整数。

关联与序号化

前面分析中提到的每个时间切片的Ai建索引,但Ai太小了(平均长度在10左右),对于过小的集合使用索引的效果不明显。所以,我们在工程上改造成对整个A(长度约有1000)建索引,这样要把时间切片序号i也加到主键上,大致代码:

A’=A.derive((ct-st)\900:i).groups(i,loc).index()

其中st是时间区间的起点,即每900秒分出一个时间切片。

这时Ci的计算要变成先关联(过滤)再分组了:

B.derive((ct-st)\900:i).join@i(i:loc,A).groups(i;count(1):C)

这样就可以计算出以iCi为字段的序表,未碰撞的情况被join@i过滤掉了。

join@i使用索引实现关联过滤时,还是要计算HASH并比对,仍然有一定的计算量。其实我们知道,全部i,loc组合最多有7*96(每天9615分钟)*27万种可能,这并不是很大。如果用一个布尔值数组(序列)表示A在各个时间切片中是否在某个loc出现过,其长度最多也就是7*96*27万,内存完全可以装得下。这样,我们就可以用对位序列技术来实现关联过滤,避免HASH计算和比对时间,能更快速地计算Ci

aloc表示A的对位序列:

aloc=A.align@a(672,(ct-st)\900+1).(x=270000.(false),~.run(x(loc)=true),x)

因为有时间切片和位置两个维度,这里也使用了二层的对位序列。将A按时间切片分成6727*96)个 组,每组是个27万个布尔值成员的序列,对于时间切片i中在位置loc出现过的对象,可以简单地用aloc(i)(loc)迅速判断出是否与a发生过碰撞(即a是否也在时间切片i中在位置loc出现过)。

a在每个时间切片的最后位置,也可以用一个序列表示为:

alast=A.align@a(672,(ct-st)\900+1).(~.m(-1).loc)

alast(i)就是a在时间切片i的最后位置,同样可以简单地用时间切片序号访问,以便快速计算Di

按天分表

以上讲的算法要求数据对no,ct有序。但数据每天会新增,新增数据通常只会对ct有序甚至彻底无序。如果每次都要所有数据大排序就非常慢,即使只把新增数据排序再归并也要重写560亿行的数据,过于耗时。

SPL复组表可以将多个有序的组表逻辑上合并成一个更大的有序组表,这样每天一个分组表存储,计算时用复组表归并分表数据,归并后的数据也可以支持并行计算。避免了全量数据每天重写,复组表读取时会损失少量归并时间,但获得数据维护的灵活性还是值得的。

当历史数据过期时,直接将相应日期的分表文件删除就可以了,非常简单。

实践过程

准备实验数据

将数据按天存储,每天内数据noct有序,保存为列存组表,例如将7天数据,分别存为:1.day.ctx,…,7.day.ctx,由这7个分表可构成复组表,造数据脚本可以这样写:


A

B

C

1

=rand@s(1)

2

for n

=file("day"/A2/".btx")

3


=movefile(B2)

4


=elapse@s(sd,(A2-1)*86400)

5


=long(B4)\1000

6


for nm

=1000000.new(100000000000+rand(8000000):no,int(B5+rand(86400)):ct,int(rand(270000)+1):loc)

7



=B2.export@ab(C6)

8


=file(A2/".day.ctx").create@py(#no,#ct,loc)

9


=B2.cursor@b().sortx(#1,#2)

10


>B8.append@i(B9)

11


=movefile(B2)

参数值有3个:

1n,几天,举例:1,代表1

2nm,每天几百万,举例:1000,代表10亿

3sd,起始日期,举例:2023-08-01

B8建立组表时用了@p选项,表示按第一个字段no作为分段键。并行计算时需要对组表分段,不能把相同no的记录分到两段,使用@p选项可以在组表分段时保证这一点。

计算脚本


A

1

=now()

2

270000

3

=n*24*3600\pt

4

=file("day.ctx":to(n)).open()

5

=A4.cursor@m(ct,loc;no==src_no).fetch().align@a(A3,(ct-st)\pt+1)

6

=alast=A5.(~.m(-1).loc)

7

=aloc=A5.(x=A2.(false),~.run(x(loc)=true),x)

8

=A4.cursor@m(;no!=src_no).derive((ct-st)\pt+1:tn,aloc(tn)(loc):loca,alast(tn):lasta)

9

=A8.group@s(no,tn;lasta,count(loca):cnt,top@1(-1,0,loc):lastb)

10

=A9.group@s(no;count(cnt>0):cnt,count(lasta && lastb && lastb!=lasta):dcnt)

11

=A10.select(cnt>0 && dcnt<=A3).total(top(-20;cnt))

12

=file("app2_result.csv").export@ct(A11.new(src_no,no:dst_no,cnt:count))

13

=interval@ms(A1,now())

参数值有4个:

1src_no为对象a的标识,举例:100000000009

2st为起始时间戳(秒),举例:1690819200,对应2023-08-01 00:00:00

3n为统计天数,举例7

4pt为切片时间的秒数,举例900

A3:为统计时间区间内的总时间切片数

A5:读出对象a的数据,产生时间切片序号并按该序号分组。组表对no有序时,用no==src_no的条件可以迅速定位到目标数据。

A6:基于A5计算a在每个时间切片的最后位置值

A7:基于A5计算a在对位序列,前面已经解释过计算原理

A8:遍历其他(非a的)对象,生成时间切片序号tn(使用新符号与a区别)。对于每一条记录,从aloc中取出当前对象是否在时间切片tn和位置loc上和a发生碰撞记入loca,从alast中取出时间切片tn中对象a的最后位置值。

A9:按对象和时间切片分组,可以用lasta计算每个对象在时间切片中与a的碰撞次数cnt,即前面分析的Ci;并计算出该对象在该时间切片中最后的loc记为lastb

A10:进一步按对象分组,计算出该对象与a的(考虑规则1 后的)碰撞次数和不匹配次数。每个时间切片中的Ci>0即认定为一次碰撞,所以是count(cnt>0),记入新的cnt;最后位置不同时计算一次不匹配,记入dcnt

A11:过滤掉无效碰撞的对象后取有效碰撞次数最多的前20名。这里做实验时采用的条件dcnt<=A3,实际应该是dcnt<=20,因为随机生成的数据中几乎没有count(Di)<=20的,就会算出空集。而count(Di)最大值就是A3,可以保证总能统计出结果。这样的计算量会比针对实际数据会更大,用于测试性能只会吃亏。

编号化及还原

以上代码在造数据时,是按no已经整数化,且lac,ci被合为序号来写的,实际上先要做转换整理,完成计算后还要还原。具体介绍可以参考:数据转存时的整数化

实验效果

SPL使用单机(8C64G),计算总时间跨度7天(总数据量有560亿行),时间切片为15分钟,耗时为121秒。

实际上,达到这个性能还会少量使用SPL企业版中的列式运算选项,但因为不涉及原理分析,这里就不详述了。

后记

这是个典型的对象统计问题,这类问题一般有如下几个特点:

1. 统计满足某种条件的对象的个数

2. 对象的数量非常多,但每个对象涉及的数据量不多

3. 条件非常复杂,通常还和次序有关,需要一定的步骤才能判断出来

面对这类问题,一个常见的思路就是把数据按对象排序,逐步取出每个对象的数据进入内存,再来做复杂的条件判断。

现实中这种运算很常用,银行的帐户统计、电商的用户漏斗分析等等都是这种运算。

SQL很难实现这种运算,不保证有序存储,也缺乏有序运算,也很难写这些复杂判断。经常要写成很绕的嵌套语句,或者使用存储过程,无论如何,执行性能都会很差。

SPL则提供了有序存储及有序计算,也支持复杂的过程计算,能够很方便实现这类统计。