基于对象 - 事件模式的数据计算问题

基于对象 - 事件模式(schema)的数据计算,可以是说商业中最常见的一种数据分析任务。这里说的对象可以是电商系统用户、游戏玩家、银行账号、手机、车辆等等,通常会有个唯一的 ID,对象涉及的事件都记录在这个 ID 下,比如手机的通话记录、用户的操作日志、银行账号的交易记录等。有时候 ID 会复杂一些,不一定是一个单一对象。比如 ERP 系统中统计仓库中商品的库龄,ID 会是仓库和商品的组合,事件则是商品的入库和出库动作,总会同时涉及仓库和商品。
有了事件数据后,我们就可以进行各种各样的统计。一个比较常见的的任务就是统计指定时间段内、涉及事件满足某种条件的 ID 的数量,更一般的说法是计算每个 ID(在指定时间段内)的涉及事件的某些聚合值,然后再基于这些聚合值做 ID 的整体统计。统计事件满足某种条件的 ID 数量,可以看成这个聚合值是布尔值(真 / 假)的情况(再统计真值的数量)。

有些聚合计算比较简单,不涉及事件发生的次序,只是统计满足某些条件的事件的发生次数或事件信息的合计值(次数本质上是在合计 1),比如银行账号的交易额超过 1 万元的交易次数、节假日发生的交易额,手机通话时间不超过 3 秒的次数,游戏用户购买某类装备的金额,…。我们可以把这类任务称为无序计算。而事件通常都是有发生时刻属性,也就有先后次序,对应地,还会有更多且更有业务意义的有序计算,也就聚合目标会和事件的发生时刻及次序相关。
比较著名的例子就是电商漏斗分析。给定一个步骤序列(比如浏览商品、下单、付款),针对每个用户找出一个短的时间窗口内(比如 3 天),在其中该用户按次序实施了这个步骤序列的最多步数(可能 0 步)。类似地,计算每张信用卡是否连续三天都有超过 1000 元的交易(这里的聚合值是个布尔值),新注册的游戏用户下次登录时刻的间隔天数,…。
计算出 ID 相关的聚合值后,再进一步统计所有 ID 的整体情况就比较简单了。比如漏斗分析中,有了每个 ID 实施的(指定步骤的)最多步数,就可以统计出走到每一步的 ID 数量(只要简单计数),进而分析哪一步的用户流失率最严重。这就是有业务意义的数据分析了。

可以想像出,相当大比例的业务数据都可以抽象成这种 ID+ 事件的模式,所以说基于 ID 的事件数据计算是最常见的数据分析任务。
然而,SQL 并不擅长实现这种统计任务,简单的无序计算问题还不大,但面对更重要的有序计算就会显得非常力不从心。

要解释这个问题,我们先要总结出这种事件数据计算的几个特征:
1. ID 数量非常多,少则数千万,多则几亿甚至几十亿
2. 同一 ID 的事件数量并不多,一般几到几百条,再多也就是几千条;
3. 针对这些事件的聚合计算可能很复杂,特别是有序计算,几乎不可能用一个简单的聚合函数写出来,经常需要多个步骤才能完成计算
4. 计算聚合值不会用到其它 ID 的事件数据,也就是 ID 之间是无关的。
有些计算目标看起来不满足特征 4,比如时空碰撞任务需要计算出某个手机(或车辆)在同一时间片段和空间范围出现次数最多的其它手机号,这看起来像是两个 ID 的事件数据一起参与计算,但实际上目标手机是确定的,它的事件数据可以事先取出后被认为是常数,每个其它手机号的事件数据事实上都在和这套常数一起计算,仍然可以认为 ID 之间无关。

SQL 的难点主要是两个方面。
ID 相关事件的聚合计算,会涉及多条互相依赖的事件记录。SQL 对这种跨行记录运算的支持力度很弱,即使有了窗口函数仍然很不方便,通常还是要借助 JOIN 将跨行记录拼到一行中才能进一步做更复杂的判断。计算过程中涉及的事件数量越多,参与 JOIN 的子查询(用于筛选出合适的事件记录 )也会越多,而且还会有依赖性(比如漏斗分析中第二步要第一步的基础的寻找),导致子查询本身也要用 JOIN 来实现事件筛选。而且,这些子查询的基础都是整个事件表,再用 ID 相等及其它筛选条件作为 JOIN 条件,而事件表常常非常巨大(ID 本身就非常多,每个 ID 还会有多条事件),大表 JOIN 不仅计算速度低下,而且也很容易跑崩,即使借助分布式系统也不容易做好。
有些事件还可能有更大的子表,比如订单表会有订单明细,实际聚合计算会更复杂,而且 JOIN 涉及的数据量也更大,这会进一步加剧上述困境。
有时 SQL 中也会用 EXISTS 来实现某些存在性的聚合计算结果,EXISTS 中 FROM 的表仍然是这个巨大的事件表,再用 ID 和主查询的 ID 相同及再加上其它筛选条件来判断,本质上和 JOIN 区别不大(事实上,大多数 EXISTS 都会被数据库优化成 JOIN 来实现,否则就计算复杂度太高了)。复杂的 EXISTS 子句的理解难度更大,优化难度也更大,这时如果难以被优化器转换成 JOIN,计算量就非常可怕了。

ID 相关的聚合值,和 ID 的关系是一对一的,也就是每个 ID 对应一套聚合值。而 JOIN 的结果并没有这个特征(EXISTS 这方面略好,但又有前述难以优化的问题),所以还要再做一次 GROUP BY ID 才能把结果的维度计算正确。而 ID 数量非常多,大结果集分组也是个性能非常差的计算任务。
有时候最后的统计是对 ID 的计数,GROUP BY 就会退化成 COUNT DISTINCT,计算逻辑会简单一些,但复杂度的数量级并没有变(DISTINCT 相当于没有聚合值的 GROUP BY,COUNT DISTINCT 则是在 DISTINCT 基础上再计数)。SQL 中绝大多数计算慢的 COUNT DISTINCT 都是因为这类事件数据计算任务造成的。

这是一个 SQL 写的简化后的三步漏斗分析,感受一下其中的 JOIN 以及 GROUP BY。

WITH e1 AS (
    SELECT uid,1 AS step1, MIN(etime) AS t1
    FROM events
    WHERE etime>=end_date-14 AND etime<end_date AND etype='etype1'
    GROUP BY uid),
e2 AS (
    SELECT uid,1 AS step2, MIN(e1.t1) as t1, MIN(e2.etime) AS t2
    FROM events AS e2 JOIN e1 ON e2.uid = e1.uid
    WHERE e2.etime>=end_date-14 AND e2.etime<end_date AND e2.etime>t1 AND e2.etime<t1+7 AND etype='etype2'
    GROUP BY uid),
e3 as (
    SELECT uid,1 AS step3, MIN(e2.t1) as t1, MIN(e3.etime) AS t3
    FROM events AS e3 JOIN e2 ON e3.uid = e2.uid
    WHERE e3.etime>=end_date-14 AND e3.etime<end_date AND e3.etime>t2 AND e3.etime<t1+7 AND etype='etype3'
    GROUP BY 1)
SELECT SUM(step1) AS step1, SUM(step2) AS step2, SUM(step3) AS step3
FROM e1 LEFT JOIN e2 ON e1.uid = e2.uid LEFT JOIN e3 ON e2.uid = e3.uid

更多步的漏斗就要写更多的子查询来 JOIN。

还有一个规则更复杂些的漏斗,最后统计还涉及 GROUP BY 和 COUNT(DISTINCT):

WITH e1 AS (
    SELECT userid, visittime AS step1_time, MIN(sessionid) AS sessionid, 1 AS step1
    FROM events e1 JOIN eventgroup ON eventgroup.id = e1.eventgroup
    WHERE visittime >= DATE_ADD(arg_date,INTERVAL -14 day) AND visittime < arg_date AND eventgroup.name = 'SiteVisit'
    GROUP BY userid,visittime
), e2 AS (
    SELECT e2.userid, MIN(e2.sessionid) AS sessionid, 1 AS step2, MIN(visittime) AS step2_time, MIN(e1.step1_time) AS step1_time
    FROM events e2 JOIN e1 ON e1.sessionid = e2.sessionid AND visittime > step1_time JOIN eventgroup ON eventgroup.id = e2.eventgroup
    WHERE visittime < DATE_ADD(step1_time ,INTERVAL +1 day) AND eventgroup.name = 'ProductDetailPage'
    GROUP BY e2.userid
), e3 AS (
    SELECT e3.userid, MIN(e3.sessionid) AS sessionid, 1 AS step3, MIN(visittime) AS step3_time, MIN(e2.step1_time) AS step1_time
    FROM events e3 JOIN e2 ON e2.sessionid = e3.sessionid AND visittime > step2_time JOIN eventgroup ON eventgroup.id = e3.eventgroup
    WHERE visittime < DATE_ADD(step1_time ,INTERVAL +1 day) AND (eventgroup.name = 'OrderConfirmationType1')
    GROUP BY e3.userid
)
SELECT s.devicetype AS devicetype,
    COUNT(DISTINCT CASE WHEN fc.step1 IS NOT NULL THEN fc.step1_userid  ELSE NULL END) AS step1_count,
    COUNT(DISTINCT CASE WHEN fc.step2 IS NOT NULL THEN fc.step2_userid  ELSE NULL END) AS step2_count,
    COUNT(DISTINCT CASE WHEN fc.step3 IS NOT NULL THEN fc.step3_userid  ELSE NULL END) AS step3_count,
FROM (
    SELECT e1.step1_time AS step1_time, e1.userid AS userid, e1.userid AS step1_userid, e2.userid AS step2_userid,e3.userid AS step3_userid,
           e1.sessionid AS step1_sessionid, step1, step2, step3
    FROM e1 LEFT JOIN e2 ON e1.userid=e2.userid LEFT JOIN e3 ON e2.userid=e3.userid ) fc
LEFT JOIN sessions s ON fc.step1_sessionid = s.id 
GROUP BY s.devicetype

其实,只要利用上面说的那几条特征,事件数据统计任务并不难做。
如果我们把事件数据按 ID 排序,每次读出一个 ID 对应的事件到内存,这占不了多少内存(特征 2),然后再用分步计算出这个 ID 对应的聚合值,内存使用过程化的语言可以容易进行非常复杂的计算(特征 2)。这样,不会有大表 JOIN,关联运算仅局限在一个 ID 所属的事件范围内(特征 4)。因为每次针对一个 ID 计算完相应的聚合值,后续也没有 GROUP BY,COUNT DISTINCT 会变成简单的 COUNT。
这个算法完全避免了大表 JOIN 和大结果集的 GROUP BY,不仅占用内存很小,而且还很容易并行。而大表 JOIN 和大结果集 GROUP BY 都属于消耗内存巨大且并行成本很高的运算。
可惜,用 SQL 无法实现这样的算法,主要原因有两点:1. SQL 缺乏离散性,不能用过程化语句写出复杂的跨行运算逻辑,就只能借助与 JOIN(或 EXISTS);2. 关系代数中的集合无序,数据表中的数据也是无序的,即使刻意有序存储 SQL 也无法利用。
SPL 强化了离散性,可以方便地写出多步骤的跨行运算,特别是对次序有关的运算支持非常好;SPL 的理论基础离散数据集基于有序集合,能够刻意保证存储的次序,而且提供有序游标语法,可以一次读入一个 ID 的数据。

用 SPL 实现上面相同的漏斗运算:


A

1

=["etype1","etype2","etype3"]

2

=file("event.ctx").open()

3

=A2.cursor(id,etime,etype;etime>=end_date-14 && etime<end_date && A1.contain(etype) )

4

=A3.group(uid)

5

=A4.(~.sort(etime)).new(~.select@1(etype==A1(1)):first,~:all).select(first)

6

=A5.(A1.(t=if(#==1,t1=first.etime,if(t,all.select@1(etype==A1.~ && etime>t && etime<t1+7).etime, null))))

7

=A6.groups(;count(~(1)):step1,count(~(2)):step2,count(~(3)):step3)

event.ctx 按 uid 有序存储,A4 就可以每次读入一个 ID 的(指定时间段内的)所有事件。多步漏斗的运算逻辑由后面的 A5/A6 两句实现,只要针对内存中当前 ID 的事件处理就可以了,按自然思路写出来,没有 JOIN 动作。后面也没有 GROUP BY,最后 A7 只要简单的计数就可以了。
这个代码对任意频数的漏斗都通用,只要改变 A1 即可。

另一个也类似:


A

1

=eventgroup=file("eventgroup.btx").import@b()

2

=st=long(elapse(arg_date,-14)),et=long(arg_date),eet=long(arg_date+1)

3

=A1.(case(NAME,"SiteVisit":1,"ProductDetailPage":2,"OrderConfirmationType1":3;null))

4

=file("events.ctx").open()

5

=A4.cursor@m(USERID,SESSIONID,VISITTIME,EVENTGROUP;VISITTIME>=st && VISITTIME<eet,EVENTGROUP:A3:#)

6

=file("sessions.ctx").open().cursor@m(USERID,ID,DEVICETYPE;;A5)

7

=A5.joinx@m(USERID:SESSIONID,A6:USERID:ID,DEVICETYPE)

8

=A7.group(USERID)

9

=A8.new(~.align@a(3,EVENTGROUP):e,e(1).select(VISITTIME<et).group@u1(VISITTIME):e1,e(2).group@o(SESSIONID):e2,e(3):e3)

10

=A9.run(e=join@m(e1:e1,SESSIONID;e2:e2,SESSIONID).select(e2=e2.select(VISITTIME>e1.VISITTIME && VISITTIME<e1.VISITTIME+86400000).min(VISITTIME)))

11

=A10.run(e0=e1.id(DEVICETYPE),e1=e.min(e1.VISITTIME),e2=e.min(e2),e=e.min(e1.SESSIONID),e3=e3.select(SESSIONID==e && VISITTIME>e2 && VISITTIME<e1+86400000).min(VISITTIME),e=e0)

12

=A11.news(e;~:DEVICETYPE,e2,e3).groups(DEVICETYPE;count(1):STEP1_COUNT,count(e2):STEP2_COUNT,count(e3):STEP3_COUNT)

在 A6 读入一个 ID 的所有事件,然后来实现复杂判断逻辑,最后分组统计时也只要简单计数就可以了,不用再考虑去重了。

这个算法依赖于事件数据对 ID 有序,而事件产生次序通常而是发生时刻。那么,是不是只能应用于事先排序过的历史数据上,对来不及一起排序的实时数据就无效了呢?
SPL 已经考虑到这一点,SPL 的复组表可以在数据进入时实现增量排序,实时保证数据在读出时对 ID 有序,可以让这个算法应用到最新的数据上。而且,因为这类运算的条件通常都会有一个时间区间,SPL 的存储还支持双维有序机制,可以迅速将时间区间外的数据过滤掉,大幅度减少数据遍历。
排序确实是时间成本较高的运算,但是一次性的,一旦完成了排序,之后的各种运算都会变得非常快。而且,SPL 复组表的数据组织机制相当于把大排序拆成多次可实时进行的小排序,将排列时间分散到日常的数据维护中,除了第一次迁移系统时会有个较长时间的排序外,日后数据不断追加过程的排序时间基本无感,而获得的计算时间提升却是数量级的。