实时更新例程

1 应用场景

和【实时追加例程】的应用场景类似,所不同的是需要对数据进行更新。本例程适用于具有如下特征的数据表的更新:

对数据更新的实时性要求很高、数据更新周期很短、任意时刻均有可能更新;单次更新的数据量比较小,可以全内存存储。

2 实现思路

与追加例程的关键不同之处:

1 新数据按组表的主键字段排序,接收到的新数据直接写入以当前时刻命名的0层分表。

2 没有混乱期,也不存在补漏数据,不需要设置最大分表号参数。

3 最高层只有唯一的主分表,最终数据都会归并到这个分表上,而不像追加例程中最高层分表也可能按时间分成多个。

4 查询时没有参数,将所有分表号按时间顺序合并成一个序列返回。

3 约定与概念

与追加例程的关键不同之处:

1. 如有删除数据的需要,则配置一个删除标识字段,该字段值为true表示删除此记录,为false表示保留此记录。不需要删除记录的应用,把删除标识字段配置成null即可。

2. 归并模式:从低层向高层归并的时候采用复组表的更新机制归并。最高层只有一个主分表,提供一个手动归并线程往主分表归并。

3. 主分表号:由于主分表只有一个,所以分表号采用0/1交替,当前分表号为0时,数据归并到1;当前分表号为1时,数据归并到0

4. 月累积:不设月累积参数,日层如果不是最高层,自动把日分表归并入月分表。

4 配置文件说明

配置文件ubc.json

该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:

[{ "sortKey":"account,tdate",

"deleteField":"deleted",

"otherFields":"ColName1,ColName2",

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":120,

"level":[0,1,2,3,4,5] ,

"blockSize":[65536,131072,262144,524288,1048576,1048576],

"interval":[60,10,2,1],

"discardZone":[],

"discardTime":[]

}]

"sortKey": 主键字段名,多个英文逗号分隔。

"deleteField": 删除标识字段名,如果无删除操作,则填null

"otherFields": 其它字段名,英文逗号分隔。

"dataDir": 组表存储路径,相对于主目录。

"dataFilename": 组表文件名,如"data.ctx"

"queryPeriod": 最长查询周期,单位秒,当被弃用的分表弃用时长超过最长查询周期后,将会被删除

"level": 层级别,取值0,1,2,3,4,5分别代表0层、秒、分、时、日、月,除了0层,其它层级别可以跳跃,但是顺序只能从小到大,比如可以配置成[0,3,4,5], 但是不能配置成[0,2,1,3,4,5],顺序不能乱;也不能配置成[1,2,3,4,5],缺了0层。

"blockSize": 为每层分表的区块大小,一般最小设65536,最大设1048576,太小太大都不合适。区块大小是指当分表追加数据的时候,如果分表的容量不够了,每次扩充容量,会扩充一个区块的大小。当读分表数据的时候,不管分表里实际数据有多少,也是每次读一个区块。而分表是按列按区块存储的,一个字段至少需要两个区块来存储,一个区块用于存储块信息,另一个用于存储数据。因此当分表的字段数非常多的时候,如果读数据的时候需要同时读很多字段,此时需要占用的内存是【读的字段数*2区块大小】。区块设小了,占用内存小,但是如果数据量非常大,区块设大点读数快。因此,对于低层分表,由于数据量比较小,所以我们把区块设小点,节约内存;对于高层分表,数据量比较大,我们把区块设大点,读数快。

"interval":层区间,以层级别为单位的时间长度,0层和最高层没有(因为最高层只有一个分表),其它层根据实际需要填,只能是整数。1层填第一个,2层填第二个,其个数比level少两个,因为没有0层和最高层。

"discardZone": 被弃用的分表号列表,内容由程序自动产生,初始配置为[]

"discardTime": 被弃用的时刻列表,内容由程序自动产生,初始配置为[]

5 存储结构

主目录下的文件及子目录:

..

data:分表存储路径,目录名在ubc.json中配置,参见前面介绍

ubc.json:配置文件

zone.btx:用于存储各级分表号的文件

data目录下的文件如下所示:

..

文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由程序根据层级别自动计算,分表号的计算规则参见【约定与概念】中的介绍。

6 配置及存储举例

电商系统

用户数在100-1000万规模,帐户余额表随时都有变化,部分长期不用且余额为0的帐户,会被自动销户,也存在一些用户主动销户的现象。

这个例子更新数据量较大,如果等一个月才向主分表归并,会导致更新数据量太大,查询太慢,所以我们设计三层,分别为0层,1层为2小时,2层为1天,3层为月,即主分表层。

其配置文件如下:

[{ "sortKey":"Account,Ttime",

"deleteField":"Deleted",

"otherFields":"Balance,Currency",

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":120,

"level":[0, 3,4,5],

"blockSize":[65536, 262144,524288,1048576],

"interval":[2,1],

"discardZone":[],

"discardTime":[]

}]

7 全局变量

zones: 其结构为序列组成的序列,存储每个层正在用的分表号,其内容和zone.btx一致。

修改配置信息的锁:使用"config" 作为锁名。

修改zones的锁:使用"zones"作为锁名。

8 代码解析

8.1 init.splx

仅在服务器启动时执行一次,服务器运行过程中不再执行,如果服务器是第一次启动,则需要初始化参数,如果是第n次启动,则需要把第n-1次执行结束时写出的分表号读入


A

B

1

=file("zone.btx")

2

if(A1.exists())

=A1.import@bi()

3

else

=json(file("ubc.json").read())

4


>B2=B3.level.len().([])

5

=env(zones,B2)

6

=register("zone","zone.splx")

7

=register("zone0","zone0.splx")

8

=call@n("merge.splx")

A1 读分表号存储文件

A2-B4 如果分表号文件存在,则读出,否则产生一个长度为层数的由空序列组成的序列

A5 声明全局变量zones,并将分表号序列赋值给它

A6 zone.splx脚本登记为同名函数

A7 zone0.splx脚本登记为同名函数

A8 启动合并线程

8.2 append.splx

用于每次收到新更新数据时,将新数据写出到0层,输入参数data为新更新数据序表,写出后将新的分表号添加到分表号列表。新数据要求传进来之前已经按组表的主键排好序


A

1

=now()

2

=lock("config")

3

>config=json(file("ubc.json").read())

4

=lock@u("config")

5

=zone0(A1)

6

=file(config.dataDir/config.dataFilename:[A5])

7

=config.sortKey.split@c().("#"+trim(~)).concat@c()

8

=if(config.deleteField, A6.create@d(${A7},${config.deleteField},${config.otherFields};;config.blockSize(1)), A6.create(${A7},${config.otherFields};;config.blockSize(1)) )

9

=A8.append@i(data.cursor())

10

=A8.close()

11

=lock("zones")

12

=zones(1)|A5

13

>zones=[A12]|zones.to(2,)

14

=file("zone.btx").export@b(zones)

15

=lock@u("zones")

A1 当前时刻

A3 读配置文件

A5 用当前时刻算出0层分表号

A6-A8 产生0层分表,@d选项表示主键后第一个字段为删除标识字段,取值为true/false

A9 将更新数据写入分表

A11-A15 将新的分表号添加至列表,并备份至文件

备注:更新数据的字段顺序按如下规则:

有删除标识字段:主键字段,删除标识字段,其它字段

无删除标识字段:主键字段,其它字段

字段顺序和配置文件中的一致

8.3 zone0.splx

根据时间算0层分表号,内部使用。

输入参数:

tm 时间


A

B

1

2023

2

[27,23,18,13,7,1]

3

return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1

A1 数据的起始年份

A2 按年、月、日、时、分、秒的顺序,每个层级别右边的二进制位数

8.4 zone.splx

将低层分表号,转成高层分表号,内部使用。

输入参数:

z 低层分表号

n 高层层序号(config.level中的序号,不是层级别)

config 配置文件内容


A

B

1

[27,23,18,13,7,1]


2

23


3

=config.interval(n-1)


4

>p = 7 - config.level(n)


5

>p=if(monthCumulate && p==3,2,p)


6

>b = A1(p)


7

>z1 = shift(z,b)


8

>b1 = A1(p-1)-b


9

>s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1


10

=shift(shift( shift(z1,b1),-b1)+s, -b)


11

if(p>2)

return A10

12

=and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1)


13

=and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1)


14

=shift(A10,A1(1))-8+A2


15

return A14*100000+A13*1000+A12*10


A1 按年月日时分秒的顺序,层级别右边的二进制位数

A2 数据的起始年份,只取后两位

A3 高层层区间

A4 根据高层的层级别算出其在A2中的位序号

A5 如果需要月累积,则改成月级别的位序号

A6 高层需要截短的位数

A7 分表号截短后的值

A8 截短后最后一级使用的位数

A9 将最后一级的值整除层区间加1

A10 s替换进z中,末尾再补0

A11 如果不需要月累积,直接返回分表号

A12-A14 分别算出年月日的分量

A15 拼出明文分表号并返回

8.5 merge.splx

定期将n-1层的数据用更新机制归并到n层,每次执行完都回到1层,只有低层无归并操作才往高层循环,全部归并完休眠一层的层区间后,再次执行。


A

B

C

D

E

F

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=long(now())-config.queryPeriod*1000

4

=config.discardTime.pselect@a(~<A3)

5

=config.discardZone(A4).run(movefile(config.dataDir/~/"."/config.dataFilename))

6

>config.discardZone.delete(A4),config.discardTime.delete(A4)

7

=file("ubc.json").write(json(config))

8

=lock@u("config")

9

= zone0(now(),config )

10

=config.level.len()-2

11


12

for A10

13


>zz = zones(A12)

14


>z =zone(A9,A12+1,config,false)

15


>zm = zz.select(~< z)

16


if zm.len()>0

17



if(config.level(A12+1)==4)

18




=zm.group(zone( ~, A12+1,config,true):zu;~:zd)

19



else

20




>D18=zm.group(zone( ~, A12+1,config,false):zu;~:zd)

21



>zus = zones(A12+1)

22



=[]

=[]

23



for D18

24




=zus.select@1(xor(~,C23.zu)<=1)

25




=if(D24,xor(D24,1),C23.zu)

26




=file(config.dataDir/config.dataFilename:(D24|C23.zd))

27




=file(config.dataDir/config.dataFilename:D25)

28




=D26.reset@w(D27:config.blockSize(#A12+1))

29




=lock("config")

30




>config=json(file("ubc.json").read())

31




>config.discardZone.insert(0,(D24|C23.zd))

32




>config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len())

33




=file("ubc.json").write(json(config))

34




=lock@u("config")

35




>C22=C22|D24,D22=D22|D25

36


=lock("zones")

37


=zones(A12)\zm

38


=(zones(A12+1)\C22)|C23

39


>zones=zones.(~).modify(A12,[C37,C38])

40


=file("zone.btx").export@b(zones)

41


=lock@u("zones")

42


goto A1

43

=config.interval(1)

44

=sleep(case(config.level(2),2:A43*60,3:A43*3600,4:A43*3600*24;A43)*1000)

45

goto A1

A2 读配置文件

A3-A6 将弃用时长超过最长查询周期的分表删除

A9 用当前时刻算出其在0层的分表号

A10 总层数减2

A12 0层开始循环

B13 当前层的分表号列表

B14 A9算出其在目标层的分表号

B15 选出小于B14的当前层分表号

B16 如果当前层有分表存在

C17如果是往日层归并

D18 按月分表号分组

C19 否则

D20 按目标层分表号分组

C22-D22 暂存弃用分表号和新产生的分表号

C23 按组循环

D24 zus中读取当前组所属的目标层分表号(含末位01两种)

D25 算出新分表号:如果当前所属目标层分表号存在,则新分表号末位交替(原末位1则新末位0;原末位0则新末位1),否则就用当前组新算出的分表号

D26-D28 把当前组的分表数据合并其所属的目标层原分表数据,一起写入新分表中

D29-D34 将写出完毕的分表号写入弃用分表号列表中,并记录弃用时间

D35 将弃用分表号和新产生的分表号暂存至C22D22

C36-C41 更新分表号列表,并把列表备份到文件

8.6 manualMerge.splx

手动把倒数第二层分表归并到最高层的主分表

输入参数:

t 查询时刻,此时刻之前完成的分表才被归并


A

B

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=lock@u("config")

4

=config.level.len()

5

if t!=null

=zone0(t,config)

6


=zone(B5,A4-1,config,true)

7


=zones.m(-2).select(~<B6)

8

else

>B7=zones.m(-2)

9

=zones(A4)

10

=if(A9.len()==1,xor(A9(1),1),0)

11

=file(config.dataDir/config.dataFilename:(A9|B7))

12

=file(config.dataDir/config.dataFilename:A10)

13

=A11.reset@w(A12:config.blockSize.m(-1))

14

=lock("config")

15

>config=json(file("ubc.json").read())

16

>config.discardZone.insert(0,(A9|B7))

17

>config.discardTime.insert(0,[long(now())]*(A9|B7).len())

18

=file("ubc.json").write(json(config))

19

=lock@u("config")

20

=lock("zones")

21

=zones(A4-1)\B7

22

=(zones(A4)\A9)|A10

23

>zones=zones.(~).modify(A4-1,[A21,A22])

24

=file("zone.btx").export@b(zones)

25

=lock@u("zones")

A5-B8 如果t不为空,则选出t时刻之前完成的分表号,否则选出全部分表号

A9 原主分表号

A10 新主分表号

A11-A13 把原主分表和倒数第二层的分表归并后写入新的主分表

A14-A19 把被归并完的分表号写入弃用列表并记录弃用时刻

A20-A25 更新分表号列表,并把之备份至文件

8.7 query.splx

查询数据时使用,返回所有分表号


A

1

return zones.rvs().conj()

说明:

1、使用复组表后必须关闭,否则merge.splx做完合并操作后无法删除分表

2、使用复组表产生游标时,必须使用@w选项,即cursor@w(),表示分表之间采用更新机制归并


例程代码下载:code.zip