实时更新例程
1 应用场景
和【实时追加例程】的应用场景类似,所不同的是需要对数据进行更新。本例程适用于具有如下特征的数据表的更新:
对数据更新的实时性要求很高、数据更新周期很短、任意时刻均有可能更新;单次更新的数据量比较小,可以全内存存储。
2 实现思路
与追加例程的关键不同之处:
1、 新数据按组表的主键字段排序,接收到的新数据直接写入以当前时刻命名的0层分表。
2、 没有混乱期,也不存在补漏数据,不需要设置最大分表号参数。
3、 最高层只有唯一的主分表,最终数据都会归并到这个分表上,而不像追加例程中最高层分表也可能按时间分成多个。
4、 查询时没有参数,将所有分表号按时间顺序合并成一个序列返回。
3 约定与概念
与追加例程的关键不同之处:
1. 如有删除数据的需要,则配置一个删除标识字段,该字段值为true表示删除此记录,为false表示保留此记录。不需要删除记录的应用,把删除标识字段配置成null即可。
2. 归并模式:从低层向高层归并的时候采用复组表的更新机制归并。最高层只有一个主分表,提供一个手动归并线程往主分表归并。
3. 主分表号:由于主分表只有一个,所以分表号采用0/1交替,当前分表号为0时,数据归并到1;当前分表号为1时,数据归并到0。
4. 月累积:不设月累积参数,日层如果不是最高层,自动把日分表归并入月分表。
4 配置文件说明
配置文件ubc.json
该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:
[{ "sortKey":"account,tdate", "deleteField":"deleted", "otherFields":"ColName1,ColName2", "dataDir":"data/", "dataFilename":"data.ctx", "queryPeriod":120, "level":[0,1,2,3,4,5] , "blockSize":[65536,131072,262144,524288,1048576,1048576], "interval":[60,10,2,1], "discardZone":[], "discardTime":[] }] |
"sortKey": 主键字段名,多个英文逗号分隔。
"deleteField": 删除标识字段名,如果无删除操作,则填null。
"otherFields": 其它字段名,英文逗号分隔。
"dataDir": 组表存储路径,相对于主目录。
"dataFilename": 组表文件名,如"data.ctx"。
"queryPeriod": 最长查询周期,单位秒,当被弃用的分表弃用时长超过最长查询周期后,将会被删除
"level": 层级别,取值0,1,2,3,4,5分别代表0层、秒、分、时、日、月,除了0层,其它层级别可以跳跃,但是顺序只能从小到大,比如可以配置成[0,3,4,5], 但是不能配置成[0,2,1,3,4,5],顺序不能乱;也不能配置成[1,2,3,4,5],缺了0层。
"blockSize": 为每层分表的区块大小,一般最小设65536,最大设1048576,太小太大都不合适。区块大小是指当分表追加数据的时候,如果分表的容量不够了,每次扩充容量,会扩充一个区块的大小。当读分表数据的时候,不管分表里实际数据有多少,也是每次读一个区块。而分表是按列按区块存储的,一个字段至少需要两个区块来存储,一个区块用于存储块信息,另一个用于存储数据。因此当分表的字段数非常多的时候,如果读数据的时候需要同时读很多字段,此时需要占用的内存是【读的字段数*2区块大小】。区块设小了,占用内存小,但是如果数据量非常大,区块设大点读数快。因此,对于低层分表,由于数据量比较小,所以我们把区块设小点,节约内存;对于高层分表,数据量比较大,我们把区块设大点,读数快。
"interval":层区间,以层级别为单位的时间长度,0层和最高层没有(因为最高层只有一个分表),其它层根据实际需要填,只能是整数。1层填第一个,2层填第二个,其个数比level少两个,因为没有0层和最高层。
"discardZone": 被弃用的分表号列表,内容由程序自动产生,初始配置为[]。
"discardTime": 被弃用的时刻列表,内容由程序自动产生,初始配置为[]。
5 存储结构
主目录下的文件及子目录:
data:分表存储路径,目录名在ubc.json中配置,参见前面介绍
ubc.json:配置文件
zone.btx:用于存储各级分表号的文件
data目录下的文件如下所示:
文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由程序根据层级别自动计算,分表号的计算规则参见【约定与概念】中的介绍。
6 配置及存储举例
电商系统
用户数在100-1000万规模,帐户余额表随时都有变化,部分长期不用且余额为0的帐户,会被自动销户,也存在一些用户主动销户的现象。
这个例子更新数据量较大,如果等一个月才向主分表归并,会导致更新数据量太大,查询太慢,所以我们设计三层,分别为0层,1层为2小时,2层为1天,3层为月,即主分表层。
其配置文件如下:
[{ "sortKey":"Account,Ttime", "deleteField":"Deleted", "otherFields":"Balance,Currency", "dataDir":"data/", "dataFilename":"data.ctx", "queryPeriod":120, "level":[0, 3,4,5], "blockSize":[65536, 262144,524288,1048576], "interval":[2,1], "discardZone":[], "discardTime":[] }] |
7 全局变量
zones: 其结构为序列组成的序列,存储每个层正在用的分表号,其内容和zone.btx一致。
修改配置信息的锁:使用"config" 作为锁名。
修改zones的锁:使用"zones"作为锁名。
8 代码解析
8.1 init.splx
仅在服务器启动时执行一次,服务器运行过程中不再执行,如果服务器是第一次启动,则需要初始化参数,如果是第n次启动,则需要把第n-1次执行结束时写出的分表号读入
A |
B |
|
1 |
=file("zone.btx") |
|
2 |
if(A1.exists()) |
=A1.import@bi() |
3 |
else |
=json(file("ubc.json").read()) |
4 |
>B2=B3.level.len().([]) |
|
5 |
=env(zones,B2) |
|
6 |
=register("zone","zone.splx") |
|
7 |
=register("zone0","zone0.splx") |
|
8 |
=call@n("merge.splx") |
A1 读分表号存储文件
A2-B4 如果分表号文件存在,则读出,否则产生一个长度为层数的由空序列组成的序列
A5 声明全局变量zones,并将分表号序列赋值给它
A6 将zone.splx脚本登记为同名函数
A7 将zone0.splx脚本登记为同名函数
A8 启动合并线程
8.2 append.splx
用于每次收到新更新数据时,将新数据写出到0层,输入参数data为新更新数据序表,写出后将新的分表号添加到分表号列表。新数据要求传进来之前已经按组表的主键排好序
A |
|
1 |
=now() |
2 |
=lock("config") |
3 |
>config=json(file("ubc.json").read()) |
4 |
=lock@u("config") |
5 |
=zone0(A1) |
6 |
=file(config.dataDir/config.dataFilename:[A5]) |
7 |
=config.sortKey.split@c().("#"+trim(~)).concat@c() |
8 |
=if(config.deleteField, A6.create@d(${A7},${config.deleteField},${config.otherFields};;config.blockSize(1)), A6.create(${A7},${config.otherFields};;config.blockSize(1)) ) |
9 |
=A8.append@i(data.cursor()) |
10 |
=A8.close() |
11 |
=lock("zones") |
12 |
=zones(1)|A5 |
13 |
>zones=[A12]|zones.to(2,) |
14 |
=file("zone.btx").export@b(zones) |
15 |
=lock@u("zones") |
A1 当前时刻
A3 读配置文件
A5 用当前时刻算出0层分表号
A6-A8 产生0层分表,@d选项表示主键后第一个字段为删除标识字段,取值为true/false
A9 将更新数据写入分表
A11-A15 将新的分表号添加至列表,并备份至文件
备注:更新数据的字段顺序按如下规则:
有删除标识字段:主键字段,删除标识字段,其它字段
无删除标识字段:主键字段,其它字段
字段顺序和配置文件中的一致
8.3 zone0.splx
根据时间算0层分表号,内部使用。
输入参数:
tm 时间
A |
B |
|
1 |
2023 |
|
2 |
[27,23,18,13,7,1] |
|
3 |
return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1 |
A1 数据的起始年份
A2 按年、月、日、时、分、秒的顺序,每个层级别右边的二进制位数
8.4 zone.splx
将低层分表号,转成高层分表号,内部使用。
输入参数:
z 低层分表号
n 高层层序号(在config.level中的序号,不是层级别)
config 配置文件内容
A |
B |
|
1 |
[27,23,18,13,7,1] |
|
2 |
23 |
|
3 |
=config.interval(n-1) |
|
4 |
>p = 7 - config.level(n) |
|
5 |
>p=if(monthCumulate && p==3,2,p) |
|
6 |
>b = A1(p) |
|
7 |
>z1 = shift(z,b) |
|
8 |
>b1 = A1(p-1)-b |
|
9 |
>s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1 |
|
10 |
=shift(shift( shift(z1,b1),-b1)+s, -b) |
|
11 |
if(p>2) |
return A10 |
12 |
=and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1) |
|
13 |
=and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1) |
|
14 |
=shift(A10,A1(1))-8+A2 |
|
15 |
return A14*100000+A13*1000+A12*10 |
A1 按年月日时分秒的顺序,层级别右边的二进制位数
A2 数据的起始年份,只取后两位
A3 高层层区间
A4 根据高层的层级别算出其在A2中的位序号
A5 如果需要月累积,则改成月级别的位序号
A6 高层需要截短的位数
A7 分表号截短后的值
A8 截短后最后一级使用的位数
A9 将最后一级的值整除层区间加1
A10 将s替换进z中,末尾再补0
A11 如果不需要月累积,直接返回分表号
A12-A14 分别算出年月日的分量
A15 拼出明文分表号并返回
8.5 merge.splx
定期将n-1层的数据用更新机制归并到n层,每次执行完都回到1层,只有低层无归并操作才往高层循环,全部归并完休眠一层的层区间后,再次执行。
A |
B |
C |
D |
E |
F |
|
1 |
=lock("config") |
|||||
2 |
>config=json(file("ubc.json").read()) |
|||||
3 |
=long(now())-config.queryPeriod*1000 |
|||||
4 |
=config.discardTime.pselect@a(~<A3) |
|||||
5 |
=config.discardZone(A4).run(movefile(config.dataDir/~/"."/config.dataFilename)) |
|||||
6 |
>config.discardZone.delete(A4),config.discardTime.delete(A4) |
|||||
7 |
=file("ubc.json").write(json(config)) |
|||||
8 |
=lock@u("config") |
|||||
9 |
= zone0(now(),config ) |
|||||
10 |
=config.level.len()-2 |
|||||
11 |
||||||
12 |
for A10 |
|||||
13 |
>zz = zones(A12) |
|||||
14 |
>z =zone(A9,A12+1,config,false) |
|||||
15 |
>zm = zz.select(~< z) |
|||||
16 |
if zm.len()>0 |
|||||
17 |
if(config.level(A12+1)==4) |
|||||
18 |
=zm.group(zone( ~, A12+1,config,true):zu;~:zd) |
|||||
19 |
else |
|||||
20 |
>D18=zm.group(zone( ~, A12+1,config,false):zu;~:zd) |
|||||
21 |
>zus = zones(A12+1) |
|||||
22 |
=[] |
=[] |
||||
23 |
for D18 |
|||||
24 |
=zus.select@1(xor(~,C23.zu)<=1) |
|||||
25 |
=if(D24,xor(D24,1),C23.zu) |
|||||
26 |
=file(config.dataDir/config.dataFilename:(D24|C23.zd)) |
|||||
27 |
=file(config.dataDir/config.dataFilename:D25) |
|||||
28 |
=D26.reset@w(D27:config.blockSize(#A12+1)) |
|||||
29 |
=lock("config") |
|||||
30 |
>config=json(file("ubc.json").read()) |
|||||
31 |
>config.discardZone.insert(0,(D24|C23.zd)) |
|||||
32 |
>config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len()) |
|||||
33 |
=file("ubc.json").write(json(config)) |
|||||
34 |
=lock@u("config") |
|||||
35 |
>C22=C22|D24,D22=D22|D25 |
|||||
36 |
=lock("zones") |
|||||
37 |
=zones(A12)\zm |
|||||
38 |
=(zones(A12+1)\C22)|C23 |
|||||
39 |
>zones=zones.(~).modify(A12,[C37,C38]) |
|||||
40 |
=file("zone.btx").export@b(zones) |
|||||
41 |
=lock@u("zones") |
|||||
42 |
goto A1 |
|||||
43 |
=config.interval(1) |
|||||
44 |
=sleep(case(config.level(2),2:A43*60,3:A43*3600,4:A43*3600*24;A43)*1000) |
|||||
45 |
goto A1 |
A2 读配置文件
A3-A6 将弃用时长超过最长查询周期的分表删除
A9 用当前时刻算出其在0层的分表号
A10 总层数减2
A12 从0层开始循环
B13 当前层的分表号列表
B14 用A9算出其在目标层的分表号
B15 选出小于B14的当前层分表号
B16 如果当前层有分表存在
C17如果是往日层归并
D18 按月分表号分组
C19 否则
D20 按目标层分表号分组
C22-D22 暂存弃用分表号和新产生的分表号
C23 按组循环
D24 从zus中读取当前组所属的目标层分表号(含末位0,1两种)
D25 算出新分表号:如果当前所属目标层分表号存在,则新分表号末位交替(原末位1则新末位0;原末位0则新末位1),否则就用当前组新算出的分表号
D26-D28 把当前组的分表数据合并其所属的目标层原分表数据,一起写入新分表中
D29-D34 将写出完毕的分表号写入弃用分表号列表中,并记录弃用时间
D35 将弃用分表号和新产生的分表号暂存至C22和D22
C36-C41 更新分表号列表,并把列表备份到文件
8.6 manualMerge.splx
手动把倒数第二层分表归并到最高层的主分表
输入参数:
t 查询时刻,此时刻之前完成的分表才被归并
A |
B |
|
1 |
=lock("config") |
|
2 |
>config=json(file("ubc.json").read()) |
|
3 |
=lock@u("config") |
|
4 |
=config.level.len() |
|
5 |
if t!=null |
=zone0(t,config) |
6 |
=zone(B5,A4-1,config,true) |
|
7 |
=zones.m(-2).select(~<B6) |
|
8 |
else |
>B7=zones.m(-2) |
9 |
=zones(A4) |
|
10 |
=if(A9.len()==1,xor(A9(1),1),0) |
|
11 |
=file(config.dataDir/config.dataFilename:(A9|B7)) |
|
12 |
=file(config.dataDir/config.dataFilename:A10) |
|
13 |
=A11.reset@w(A12:config.blockSize.m(-1)) |
|
14 |
=lock("config") |
|
15 |
>config=json(file("ubc.json").read()) |
|
16 |
>config.discardZone.insert(0,(A9|B7)) |
|
17 |
>config.discardTime.insert(0,[long(now())]*(A9|B7).len()) |
|
18 |
=file("ubc.json").write(json(config)) |
|
19 |
=lock@u("config") |
|
20 |
=lock("zones") |
|
21 |
=zones(A4-1)\B7 |
|
22 |
=(zones(A4)\A9)|A10 |
|
23 |
>zones=zones.(~).modify(A4-1,[A21,A22]) |
|
24 |
=file("zone.btx").export@b(zones) |
|
25 |
=lock@u("zones") |
A5-B8 如果t不为空,则选出t时刻之前完成的分表号,否则选出全部分表号
A9 原主分表号
A10 新主分表号
A11-A13 把原主分表和倒数第二层的分表归并后写入新的主分表
A14-A19 把被归并完的分表号写入弃用列表并记录弃用时刻
A20-A25 更新分表号列表,并把之备份至文件
8.7 query.splx
查询数据时使用,返回所有分表号
A |
|
1 |
return zones.rvs().conj() |
说明:
1、使用复组表后必须关闭,否则merge.splx做完合并操作后无法删除分表
2、使用复组表产生游标时,必须使用@w选项,即cursor@w(),表示分表之间采用更新机制归并
例程代码下载:code.zip
英文版