实时追加例程 (分表)

1. 应用场景

本例程适用于具有如下特征的数据表的追加:

对数据追加的实时性要求很高、数据追加周期很短、任意时刻均有可能追加;数据只有追加无删除修改的需求;单次追加的数据量比较小,可以全内存存储。

2. 实现思路

1、 为了尽快完成新数据归并,设计将数据分层存储,新数据先快速写入缓冲区,然后启用另一个线程定期将缓冲区数据写入近期的小分段区间分表,这样的分表很小,能在追加周期内快速完成新老数据的归并

2、 用一个归并线程将小分段区间分表归并成一个较大的分段区间分表,完成即可启用新的大分段区间分表,原有小分段区间分表可删除。分段区间可分成多个层次,逐层向上归并,在分段数量(影响读取性能)和区间大小(影响归并性能)之间取得平衡。比如,数据可以分成日、小时、分钟三层。每10分钟的数据存储成一个分表。一小时结束后,将6个分表归并成一个小时分表。一天结束时,将24个的小时分表归并成一个日分表,新的一天继续生成新的小时分表。完整的数据由一天之前的日分表、一天之内的小时分表和当前小时的分钟分表构成的复组表。

3、 查询时根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。

3. 约定与概念

1. 获得新数据时立即写入缓冲区,缓冲区文件按传入时间命名。由写出线程将缓冲区数据写入0层分表,已经处理完毕的缓冲区文件将被删除,剩下的数据将写入新的缓冲区文件。

2. 设置混乱期参数,绝大多数混乱期之前的数据在混乱期内都会追加进来。仍允许混乱期之前的数据在混乱期之后再追加进来,但是频率较小。混乱期内的数据不写出也不参与计算。

3. 写出数据采用复组表格式,由多个分表构成,每个分表一个时间区间内的数据。

4. 数据按时间分层存储,0层为从缓冲区直接写出的数据,n层为从n-1层合并出来的数据。

5. 0层外,每个层次有个时间级别,目前先仅支持 日、时、分、秒 四种,不同层的级别不能相同。

6. 每隔相应指定数量个层级别时间后会启用一个新的分表,分表对应的时间区间称层区间。上层区间被分成整数个下层区间, 层区间的时间单位是层级别。

7. 合并模式:n-1层向n层合并的模式为批量合并:某层的分表会有多个,每个区间对应一个;上层区间对应的下层区间分表都准备好之后一起合并成上层区间分表。

8. 最后分表号:每层都有一个最后分表号,这个最后分表号不是物理上存在的,而是根据上一次归并时的(当前时刻-混乱期) 计算出来的n层分表号。本次归并时如果(当前时刻-混乱期) 计算出来的n层分表号大于上次记录的最后分表号,才启动本次归并。这样可以保证每层的归并周期是事先约定的,不会频繁归并。

9. 月累积:日层如果是最高层,其分表可以直接归并入月分表,即直接用月分表号命名,但是最大分表号依旧用日分表号。

10. 写出线程:以混乱期为周期定期执行,把缓冲区中混乱期之前的数据写到0层。

11. 合并线程:以一层层区间为周期定期执行,从0层起逐层把n-1层分表合并成n层。

12. 查询线程:根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。

13. 分表号计算规则:用分表对应的时间区间起始点命名,每个时间分量对应的二进制位数:年4+4+5+5+6+6+零层1=31位,其中零层的值固定为1,时分秒的分量均加1,即所有分量从1开始计算。上层的分表号根据层级别把右侧所有二进制位的值置0,当前层级别的值整除层区间后再加1

14. 最高层的分表号采用明文方式,如为年月则yyMM00k 的格式;如为年月日则yyMMddk 的格式。其中k为交替位,取值为01

15. 交替位:如果目标分表存在,则和下层归并上来的数据一起归并写入新的分表,新的分表号和旧的分表号最末二进制位为0/1交替,即原目标分表号末位是0,则新分表号为1,反之为0,前面的所有位的值和原目标分表号一致。

16. 最长查询周期:被归并的分表在归并结束时,把该分表号写入弃用分表号列表,并记录弃用时刻,同时从分表号列表里删除。当弃用时长超过最长查询周期时,才把这些分表从硬盘以及弃用列表中删除。该机制可以保证被归并完的分表如果正在被查询,不会立刻被删除,从而引发查询错误。

4. 配置文件说明

配置文件ubc.json

该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:

[{ "choasPeriod":10,

"sortKey":"account,tdate",

"timeField":"tdate",

"otherFields":"ColName1,ColName2",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":120,

"level":[0,1,2,3,4] ,

"blockSize":[65536,131072,262144,524288,1048576],

"interval":[60,10,2,1],

"lastZone":[null,null,null,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

"choasPeriod":混乱期时间长度,整数,单位秒。

"sortKey":排序字段名,一般要包含时间字段,多个英文逗号分隔。

"timeField":时间字段名。

"otherFields":其它字段名,英文逗号分隔。

"bufferDir":缓冲区备份文件存储路径,相对于主目录。

"buffer":缓冲区备份文件名列表,初始填[],后由程序自动算。

"dataDir":分表存储路径,相对于主目录。

"dataFilename":组表文件名,如"data.ctx"

"queryPeriod": 最长查询周期,单位秒,当被弃用的分表弃用时长超过最长查询周期后,将会被从硬盘上删除。

"level":层级别,取值0,1,2,3,4分别代表0层,秒、分、时、天,除了0层,其它层级别可以跳跃,但是顺序只能从小到大,比如可以配置成[0,3,4], 但是不能配置成[0,2,1,3,4],顺序不能乱;也不能配置成[1,2,3,4],缺了0层。

"blockSize": 为每层分表的区块大小,一般最小设65536,最大设1048576,太小太大都不合适。区块大小是指当分表追加数据的时候,如果分表的容量不够了,每次扩充容量,会扩充一个区块的大小。当读分表数据的时候,不管分表里实际数据有多少,也是每次读一个区块。而分表是按列按区块存储的,一个字段至少需要两个区块来存储,一个区块用于存储块信息,另一个用于存储数据。因此当分表的字段数非常多的时候,如果读数据的时候需要同时读很多字段,此时需要占用的内存是【读的字段数*2区块大小】。区块设小了,占用内存小,但是如果数据量非常大,区块设大点读数快。因此,对于低层分表,由于数据量比较小,所以我们把区块设小点,节约内存;对于高层分表,数据量比较大,我们把区块设大点,读数快。

"interval":层区间,以层级别为单位的时间长度,0层没有,日层只能填1,其它层根据实际需要填,只能是整数。1层填第一个,2层填第二个,其个数比level少一个,因为没有0层。

"lastZone": 每层的最后分表号,没有0层,所以比level少一个,初始全部填null

"monthCumulate":是否把最高层的日分表归并入月分表。

"discardZone":被弃用的分表号列表,其值由程序自动产生,初始配置为[]

"discardTime":被弃用的时刻列表,其值由程序自动产生,初始配置为[]

5. 存储结构

1、 缓冲区的数据结构

由传入的数据组成的排列,保持其原始的数据结构。

2、 文件的存储目录及命名规则

主目录下的文件及子目录:

..

buffer:缓冲区备份文件存储路径,目录名在ubc.json中配置,参见前面介绍

data:分表存储路径,目录名在ubc.json中配置,参见前面介绍

zone.btx:用于存储各级分表号的文件

ubc.json:配置文件

缓冲区备份文件以数据传入时刻命名,格式为"yyyyMMddHHmmss.btx"

data目录下的文件如下所示:

..

文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由程序根据层级别自动计算,分表号的计算规则参见【约定与概念】中的介绍。

6. 配置及存储举例

1. 电商系统

用户数在100-1000万规模,每天记录数在100-500万行,混乱期10分钟。

这个例子数据量较小,混乱期时间较长,设置两层就行,把1层区间定义为2小时, 2层区间定义为1天,monthCumulate设为true,把最高层数据直接归并到月,这样避免产生过多小分表。

其配置文件如下:

[{ "choasPeriod":600,

"sortKey":"uid,etime",

"timeField":"etime",

"otherFields":"eventID,eventType",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":600,

"level":[0,3,4],

"blockSize":[65536, 524288,1048576],

"interval":[2,1],

"lastZone":[null,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

2. 信号系统

其特点是产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB甚至更多的数据量)。设备号或者测点名在1-10万规模,数据每秒追加一次,混乱期设为10秒,一天的数据量大约在17亿行左右。

这个例子数据量非常大,每分钟的数据量在100多万行,所以为了能快速地归并,我们把1层的区间设置为60秒,2层的区间设置为10分钟,3层的区间设置为2小时,4层的区间设置为1天。由于信号系统数据量太大,且查询时段通常不超过24小时,所以monthCumulate设为false

由于层级别不能相同,所以我们把1层的层区间设置成60秒,而不是1分钟。

其配置文件如下:

[{ "choasPeriod":10,

"sortKey":"TagName",

"timeField":"time",

"otherFields":"Type,Quality,Value",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":10,

"level":[0,1,2,3,4],

"blockSize":[65536,131072,262144,524288,1048576],

"interval":[60,10, 2,1],

"lastZone":[null,null,null,null],

"monthCumulate":false,

"discardZone":[],

"discardTime":[]

}]

7. 全局变量

zones: 其结构为序列组成的序列,存储每个层正在用的分表号,其内容和zone.btx一致。

修改配置信息的锁:使用"config" 作为锁名。

修改zones的锁:使用"zones"作为锁名。

8. 代码解析

init.splx

仅在服务器启动时执行一次,服务器运行过程中不再执行,如果服务器是第一次启动,则需要初始化参数,如果是第n次启动,则需要把第n-1次执行结束时写出的分表号读入


A

B

1

=file("zone.btx")

2

if(A1.exists())

=A1.import@bi()

3

else

=json(file("ubc.json").read())

4


>B2=B3.level.len().([])

5

=env(zones,B2)

6

=register("zone","zone.splx")

7

=register("zone0","zone0.splx")

8

=call@n("write.splx")

9

=call@n("merge.splx")

A1 读分表号存储文件

A2-B4 如果分表号文件存在,则读出,否则产生一个长度为层数的由空序列组成的序列

A5 声明全局变量zones,并将分表号序列赋值给它

A6 zone.splx脚本登记为同名函数

A7 zone0.splx脚本登记为同名函数

A8 启动写出线程

A9 启动合并线程

append.splx

用于每次收到新数据时, 立即将新数据写出到缓冲区,输入参数data为新数据序表,写出后将新增的缓冲区文件名添加进配置文件


A

1

=long(now())

2

=lock("config")

3

=json(file("ubc.json").read())

4

=file(A3.bufferDir/A1/".btx").export@b(data)

5

=A3.buffer|=A1

6

=file("ubc.json").write(json(A3))

7

=lock@u("config")

A3 读配置文件

A4 将新数据写入当前时刻命名的缓冲区文件

A5 将新增缓冲区文件名添加进配置文件

备注:传入数据的字段顺序按如下规则:

排序字段包含时间字段:排序字段、其它字段

排序字段不包含时间字段:排序字段、时间字段、其它字段

字段顺序和配置文件中的一致

zone0.splx

根据时间算0层分表号,内部使用。

输入参数:

tm 时间


A

B

1

2023

2

[27,23,18,13,7,1]

3

return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1

A1 数据的起始年份

A2 按年、月、日、时、分、秒的顺序,每个层级别右边的二进制位数

zone.splx

将低层分表号,转成高层分表号,内部使用。

输入参数:

z 低层分表号

n 高层层序号(config.level中的序号,不是层级别)

config 配置文件内容

monthCumulate 是否把最高层的日分表归并入月分表


A

B

1

[27,23,18,13,7,1]


2

23


3

=config.interval(n-1)


4

>p = 7 - config.level(n)


5

>p=if(monthCumulate && p==3,2,p)


6

>b = A1(p)


7

>z1 = shift(z,b)


8

>b1 = A1(p-1)-b


9

>s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1


10

=shift(shift( shift(z1,b1),-b1)+s, -b)


11

if(p>3 || n<config.level.len())

return A10

12

=and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1)


13

=and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1)


14

=shift(A10,A1(1))-8+A2


15

return A14*100000+A13*1000+A12*10


A1 按年月日时分秒的顺序,层级别右边的二进制位数

A2 数据的起始年份,只取后两位

A3 高层层区间

A4 根据高层的层级别算出其在A1中的序号

A5 如果monthCumulatetrue且目标层为日层,则p设为2,表示后面截短到月

A6 高层需要截短的位数

A7 分表号截短后的值

A8 截短后最后一级使用的位数

A9 将最后一级的值整除层区间加1

A10 A9替换回A7中最后一级的位,然后将A7被截掉的位补0

A11 如果目标层层级别是小时及以下,或者不是最高层,直接返回A10

如果目标层是日以上的层级别且是最高层,则继续计算短的明文分表号:

A12-A14 分别计算年月日分量,将年恢复成实际的年份

A15 将年月日分量拼起来,末位加上0

write.splx

定时执行,执行周期为混乱期的时长,用于将缓冲区中混乱期之前的数据写出到0层。


A

B

C

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=long(now())-config.queryPeriod*1000

4

=config.discardTime.pselect@a(~<A3)

5

=config.discardZone(A4).(movefile(config.dataDir/~/"."/config.dataFilename))

6

>config.discardZone.delete(A4),config.discardTime.delete(A4)

7

=file("ubc.json").write(json(config))

8

=lock@u("config")

9

=elapse@s(now(),-config.choasPeriod)

10

=config.buffer

11

=A10.(config.bufferDir/~/".btx")

12

=A11.conj(file(~).import@b()).sort(${config.timeField})

13

=A12.select(${config.timeField}<=A9)

14

=[]

15

for A13.len()>0

16


=A13.${config.timeField}

17


=func(A56,B16)

18


=zone0(B16)

19


=A13.pselect(${config.timeField}>=B17)

20


if(B19==null)

21



>data=A13,A13=[]

22


else

23



>data=A13.to(B19-1)

24



>A13=A13.to(B19,A13.len())

25


=lock("config")

26


>config=json(file("ubc.json").read())

27


if(zones(1).pos(B18) || config.discardZone.pos(B18))

28



goto A15

29


else

>A12=A12\data

30


=lock@u("config")

31


=file(config.dataDir/config.dataFilename:[B18])

32


>data=data.sort(${config.sortKey})

33


=B31.create(${config.sortKey.split@c().("#"+trim(~)).concat@c()},${config.otherFields};;config.blockSize(1))

34


=B33.append@i(data.cursor())

35


=B33.close()

36


>A14.insert(0,B18)

37

if(A14.len()>0)

38


=lock("zones")

39


=(zones(1)|A14).sort()

40


>zones=[B39]|zones.to(2,)

41


=file("zone.btx").export@b(zones)

42


=lock@u("zones")

43

=A10.run(movefile(config.bufferDir/~/".btx"))

44

if(A12.len()>0)

45


=long(A12.${config.timeField})

46


=file(config.bufferDir/B45/".btx").export@b(A12)

47

=lock("config")

48

>config=json(file("ubc.json").read())

49

>config.buffer=config.buffer\A10

50

if(A12.len()>0)

51


>config.buffer=config.buffer|B45

52

=file("ubc.json").write(json(config))

53

=lock@u("config")

54

=sleep(config.choasPeriod*1000)

55

goto A1

56

func

57


=[year(A56),month(A56)-1,day(A56)-1,hour(A56),minute(A56),second(A56)]

58


=7-config.level(2)

59


=config.interval(1)

60


=B57(B58)=B57(B58)\B59*B59+B59

61


>B57.run(if(#>B58,~=0) )

62


=datetime(B57(1),B57(2)+1,B57(3)+1,B57(4),B57(5),B57(6))

63


return B62

A3-A7 选出配置文件中保存的弃用时间超出最长查询周期的弃用分表号,将之从硬盘上删除,并从配置文件中删除

A9 计算混乱期时刻

A10 缓冲区文件名

A11-A12 读缓冲区文件,按时间字段排序

A13 过滤出混乱期时刻之前的数据

A14 用于暂存新产生的0层分表号

A15 如果有满足条件需要写出的缓冲区数据

B16 获得第一条记录的时间

B17 根据B16算出其所属的1层区间的结束时间

B18 B16算出0层分表号

B19 A13中选出第一条大于等于B17的记录序号

B20-C24 选出记录序号小于B19的记录赋给data变量,A13中仅保留记录序号大于等于B19的记录

B25-C30 如果0层或者弃用列表中有待写出的目标分表号,则跳过当前组,否则将当前组数据从A12中删除,然后继续后面的写出操作

B31-B35 datasortKey排序后写入以B18为分表号的分表中

B36 B18追加到A14序列中

A37-B42 A14添加到0层分表号列表并重新排序,将分表号列表写出到备份文件

A43 删除处理过的缓冲区文件

A44-B46 将处理剩下的缓冲区数据写回缓冲区文件

A49 将处理过的缓冲区文件名从配置文件中删除

B51 将新写出的缓冲区文件名添加进配置文件

A56 计算传入时间所属的1层区间的结束时间

merge.splx

定期执行,执行周期为一层层区间时长。用于将低层的数据逐层向高层归并,每次归并完都回到0层重新循环,只有低层无归并操作才往高层循环。


A

B

C

D

E

F

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=lock@u("config")

4

>tm=elapse@s(now(),-config.choasPeriod)

5

= zone0(tm)

6

=config.level.len()-1

7

for A6

8


>zz = zones(A7)

9


=zone(A5,A7+1,config,false)

10


=config.lastZone(A7)

11


=lock("config")

12


>config=json(file("ubc.json").read())

13


>config.lastZone(A7)=B9

14


=file("ubc.json").write(json(config))

15


=lock@u("config")

16


if zz.len()>0 && B9>B10

17



=zz.group(zone( ~, A7+1,config,false):zu;~:zd)

18



=C17.select(zu<B9)

19



if(config.monthCumulate && A7==A6)

20




>C18=C18.conj(zd).group(zone( ~, A7+1,config,true):zu;~:zd)

21



>zus = zones(A7+1)

22



=[]

=[]

=[]

23



for C18

24




=zus.select@1(xor(~,C23.zu)<=1)

25




=if(D24,xor(D24,1),C23.zu)

26




=lock("config")

27




>config=json(file("ubc.json").read())

28




if((D24!=null && config.level(A7+1)==1)|| config.discardZone.pos(D25))

29





next C23

30




else

>E22.insert(0,C23.zd)

31




=lock@u("config")

32




=file(config.dataDir/config.dataFilename:(D24|C23.zd))

33




=file(config.dataDir/config.dataFilename:D25)

34




=D32.reset(D33:config.blockSize(#A7+1))

35




=lock("config")

36




>config=json(file("ubc.json").read())

37




>config.discardZone.insert(0,(D24|C23.zd))

38




>config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len())

39




=file("ubc.json").write(json(config))

40




=lock@u("config")

41




>C22=C22|D24,D22=D22|D25

42



=lock("zones")

43



=(zones(A7)\E22).sort()

44



=((zones(A7+1)\C22)|D22).sort()

45



>zones=zones.(~).modify(A7,[C43,C44])

46



=file("zone.btx").export@b(zones)

47



=lock@u("zones")

48



if E22.len()>0

49




goto A1

50

=config.interval(1)

51

=sleep(case(config.level(2),2:A50*60,3:A50*3600,4:A50*3600*24;A50)*1000)

52

goto A1











A1-A3 读配置文件

A4 混乱期时刻

A5 用混乱期时刻算出其在0层的分表号

A7 0层开始循环

B8 n-1层的分表号列表

B9 A5算出其在n层的分表号

B10 n层的最后分表号

B11-B15 更新n层的最后分表号

B16 如果n-1层有分表,且混乱期时刻算出的n层分表号大于n层的最后分表号

C17 n-1层分表号按其在n层对应的分表号分组

C18 过滤出n层分表号小于B9的组(表示这些组的所有分表已经准备完毕)

C19 如果monthCumulatetrue且当前是往最高层归并

D20 C18重新用月分表号分组

C21 zones中读出n层已有的分表号,赋给变量zus

C22 暂存需要弃用的分表号

D22 暂存新增的分表号

E22 暂存n-1层被归并成功的分表号

C23 按组循环

D24 zus中读取当前组所属的分表号(含末位01两种)

D25 算出新分表号:如果当前组所属分表号存在,则新分表号交替(原末位1则新末位0;原末位0则新末位1),否则就用当前组新算出的分表号

D26-D31 如果弃用分表号列表包含新分表号,或者当前是往秒层归并且秒层已有目标分表(秒层不能末位交替,会和0层重号),则跳过当前组,否则把当前组分表号添加到E22并继续下面的归并操作

D32-D34 把当前组的分表数据与原n层对应的分表数据,一起写入新分表中

D35-D40 将归并完毕的分表号添加到弃用分表号列表中,并记录弃用时间

D41 将弃用分表号和新增分表号分别添加到C22D22

C42-C47 更新分表号列表,并备份到文件

query.splx

查询数据时使用,返回满足查询条件的分表号列表

输入参数:

start:起始时间

end:结束时间


A

1

>z=zones.m(:-2).rvs().conj()

2

>sz=zone0(start)

3

>ez=zone0(end)

4

>sp = z.pselect@z(~<=sz)

5

>ep = ifn(z.pselect( ~>ez), z.len()+1)

6

=if (sp >= ep,null,z.to( ifn(sp,1), ep-1 ))

7

=lock("config")

8

>config=json(file("ubc.json").read())

9

=lock@u("config")

10

>z=zones.m(-1)

11

>sz=(year(start)%100)*100000+month(start)*1000+if(!config.monthCumulate,day(start)*10,0)

12

>ez=(year(end)%100)*100000+month(end)*1000+if(!config.monthCumulate,day(end)*10,0)+1

13

>sp = ifn(z.pselect@z( ~<sz), 0 )

14

>ep = ifn(z.pselect( ~>ez), z.len()+1)

15

=if (sp >= z.len(),null,z.to(sp+1, ep-1))

16

return A15|A6

A1 将除了最高层以外的所有分表号按时间顺序排列

A2-A3 算出输入参数start\end对应的0层分表号(由于0层分表号的末位本身是1,所以这里不用再加1了)

A4 从后往前找出第一个早于等于起始时间的分表号位置

A5 从前往后找出第一个晚于结束时间的分表号位置

A6 选出位于起止时间之间的分表号,左闭右开;没有则返回null

A7-A9 读配置文件

A10 最高层的分表号

A11-A12 算出输入参数start\end对应的最高层分表号,考虑到交替位的问题,sz的末位置0ez1可以方便写出查找代码

A13 从后往前找出第一个早于起始时间的分表号位置,没有则置0

A14 从前往后找出第一个晚于结束时间的分表号位置

A15 选出位于起止时间之间的分表号;没有则返回null

A16 将高层的查询结果和低层的查询结果合并后返回

说明:使用复组表后必须关闭,否则merge.splx做完合并操作后无法删除分表