实时追加例程

背景与方法

本例程适应场景:数据维护的实时性要求很高、追加周期很短、任意时刻均有可能追加;需要使用复组表分层存储;仅支持追加模式。单次追加数据量比较小,可以用序表。

方法:

1、 为了解决追加周期很短,无法完成数据归并的问题,设计将数据分层存储,新数据先快速写入缓冲区,然后启用另一个线程定期将缓冲区数据写入近期的小分段区间分表,这样的分表很小,能在追加周期内快速完成数据的归并

2、 然后再用另一个归并线程将小分段区间分表归并成一个较大的分段区间分表,完成即可启用新的大分段区间分表,原有小分段区间分表可删除。分段区间可分成多个层次,逐层向上归并,在分段数量(影响读取性能)和区间大小(影响归并性能)之间取得平衡。

3、 比如,数据可以分成日、小时、分钟三层。每10分钟的数据存储成一个分表。一小时结束后,将6个分表归并成一个小时分表。一天结束时,将24个的小时分表归并成一个日分表,新的一天继续生成新的小时分表。完整的数据由一天之前的日分表、一天之内的小时分表和当前小时的分钟分表构成的复组表。

4、 查询时根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。

约定与概念

1. 获得新数据时立即写入缓冲区,缓冲区文件按读入时间命名和标识。由写出线程将缓冲区数据写入0层分表,已经处理完毕的缓冲区文件将被删除,处理剩下的数据将写入新的缓冲区文件。

2. 设置混乱期参数,绝大多数数据在混乱期之后的时间段内都会追加进来。仍允许混乱期之前的数据再追加进来,但是频率较小。混乱期内的数据不写出也不参与计算。

3. 写出数据采用复组表格式,由多个分表构成,每个分表一个时间区间内的数据。

4. 数据按时间分层存储,0层为从缓冲区直接写出的数据,n层为从n-1层合并出来的数据。

5. 0层外,每个层次有个时间级别,目前先仅支持 日、时、分、秒 五种,不同层的级别不能相同。

6. 每隔相应指定数量个层级别时间后会启用一个新的分表,分表对应的时间区间称层区间;上层区间被分成整数个下层区间。层区间的时间单位是层级别。

7. 合并模式:n-1层向n层合并的模式为批量合并:某层的分表会有多个,每个区间对应一个;上层区间对应的下层区间分表都准备好之后一起合并成上层区间分表。

8. 最后分表号:每层都有一个最后分表号,这个最后分表号不是物理上存在的,而是根据上一次归并时的(当前时刻-混乱期) 计算出来的n层分表号。本次归并时如果(当前时刻-混乱期) 计算出来的n层分表号大于上次记录的最后分表号,才启动本次归并。这样可以保证每层的归并周期是事先约定的,不会频繁归并。

9. 月累积:日层如果是最高层,可以直接累积成月分表,即直接用月分表号命名,但是最大分表号依旧用日分表号。

10. 写出线程:以混乱期为周期定期执行,把缓冲区中混乱期之前的数据写到0层;

11. 合并线程:0层起把n-1层分表合并成n层。

12. 查询线程:根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。

13. 分表号计算规则:用分表中数据的时间区间起始点命名,每个时间分量对应的二进制位数:年4+4+5+5+6+6+零层1=31位,其中零层的值固定为1,时分秒的分量均加1,即所有分量从1开始计算;上层的分表号根据层级别把右侧所有二进制位的值置0,当前层级别的值整除层区间后再加1

14. 最高层的分表号采用明文方式,如为年月则yyMM00k 的格式;如为年月日则yyMMddk 的格式,其中k为交替位,取值为01

15. 交替位:如果目标分表存在,则和下层归并上来的数据一起归并写入新的分表,新的分表号和旧的分表号最末二进制位为0/1交替,即原目标分表号末位是0,则新分表号为1,反之为0,前面的所有位的值和原目标分表号一致。

16. 查询周期:配置最长查询周期,被归并的分表在归并操作结束时,把该分表号写入弃用分表号列表,并记录弃用时刻,同时从分表号列表里删除。当弃用时长超过最长查询周期时,才把这些分表从硬盘以及弃用列表中删除。该机制可以保证被归并完的分表如果正在被查询,不会立刻被删除,从而引发查询错误。

配置文件说明

配置文件ubc.json

该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:

[{ "choasPeriod":10,

"sortKey":"account,tdate",

"timeField":"tdate",

"otherFields":"ColName1,ColName2",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":120,

"level":[0,1,2,3,4],

"interval":[60,10,2,1],

"lastZone":[null,null,null,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

"choasPeriod":混乱期时间长度,整数,单位秒。

"sortKey":排序字段名,一般要包含时间字段,多个英文逗号分隔。

"timeField":时间字段名。

"otherFields":其它字段名,英文逗号分隔。

"bufferDir":缓冲区备份文件存储路径,相对于主目录。

"buffer":缓冲区备份文件名列表,初始填[],后由代码自动算。

"dataDir":分表存储路径,相对于主目录。

"dataFilename":组表文件名,如"data.ctx"

"queryPeriod": 最长查询周期,单位秒,当被弃用的分表弃用时长超过最长查询周期后,将会被从硬盘上删除。

"level":层级别,取值0,1,2,3,4分别代表0层,秒、分、时、天,除了0层,其它层级别可以跳跃,但是顺序只能从小到大,比如可以配置成[0,3,4], 但是不能配置成[0,2,1,3,4],顺序不能乱;也不能配置成[1,2,3,4],缺了0层。

"interval":层区间,以层级别为单位的时间长度,0层没有,日层只能填1,其它层根据实际需要填,只能是整数。1层填第一个,2层填第二个,其个数比level少一个,因为没有0层。

"lastZone": 每层的最后分表号,没有0层,所以比level少一个,初始全部填null

"monthCumulate":是否累积到月。

"discardZone":被弃用的分表号列表,内容由代码自动产生,初始配置为[]

"discardTime":被弃用的时刻列表,内容由代码自动产生,初始配置为[]

存储结构

1、 缓冲区的数据结构

由用户传入的数据组成的排列,保持用户原始的数据结构。

2、 数据文件的存储目录及命名规则

主目录下的文件及子目录:

..

buffer:缓冲区备份文件存储路径,目录名在ubc.json中配置,参见前面介绍

data:组表存储路径,目录名在ubc.json中配置,参见前面介绍

ubc.json:配置文件

缓冲区备份文件以读入时刻命名,格式为"yyyyMMddHHmmss.btx"

data目录下的文件如下所示:

..

文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由系统根据层级别自动计算,分表号的计算规则参见第2节介绍。

配置及存储举例

1. 电商系统

用户数在100-1000万规模,每天记录数在100-500万行,混乱期10分钟。

这个例子数据量较小,混乱期时间较长,设置两层就行,把1层定义为2小时, 2层定义为1天,月累积设为true,把最高层数据直接累积到月,这样避免分表太碎。

其配置文件如下:

[{ "choasPeriod":600,

"sortKey":"uid,etime",

"timeField":"etime",

"otherFields":"eventID,eventType",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":600,

"level":[0,3,4],

"interval":[2,1],

"lastZone":[null,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

2. 信号系统

其特点是产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB甚至更多的数据量)。设备号或者测点名在1-10万规模,记录数每秒追加一次,混乱期按10秒算,一天的数据量大约在17亿行左右。

这个例子数据量非常大,每分钟的数据量在100多万行,所以为了能快速地归并,我们把1层的区间定义为60秒,2层的区间定义为10分钟,3层的区间定义为2小时,4层的区间定义为1天。由于信号系统数据量太大,且查询时段通常不超过24小时,所以月累积设为false

由于层级别不能相同,所以我们把1层的层区间定义成60秒,而不是1分钟。

其配置文件如下:

[{ "choasPeriod":10,

"sortKey":"TagName",

"timeField":"time",

"otherFields":"Type,Quality,Value",

"bufferDir":"buffer/",

"buffer":[],

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":10,

"level":[0,1,2,3,4],

"interval":[60,10, 2,1],

"lastZone":[null,null,null,null],

"monthCumulate":false,

"discardZone":[],

"discardTime":[]

}]

全局变量

zones: 序列的序列,存储每个层正在用的分表号。

配置信息锁:使用"config" 作为锁名。

修改zones的锁:使用"zones"作为锁名。

代码解析

init.splx

仅在服务器启动时执行一次,过程中不再执行,如果服务器是第一次启动,则需要初始化参数,如果是第n次启动,则需要把第n-1次执行结束时写出的配置信息读入


A

B

1

=file("zone.btx")

2

if(A1.exists())

=A1.import@bi()

3

else

=json(file("ubc.json").read())

4


>B2=B3.level.len().([])

5

=env(zones,B2)

6

=register("zone","zone.splx")

7

=register("zone0","zone0.splx")

8

=call@n("write.splx")

9

=call@n("merge.splx")

A1 读分表号存储文件

A2-B4 如果分表号文件存在,则读出,否则构造一个长度为层数的由空序列组成的序列

A5 将分表号序列设为全局变量zones

A6 zone.splx脚本登记为同名函数

A7 zone0.splx脚本登记为同名函数

A8 启动写出线程

A9 启动合并线程

append.splx

用于每次收到新数据时, 将新数据写出到缓冲区,输入参数data为新数据序表,写出后将新增的缓冲区文件名写入config


A

1

=long(now())

2

=lock("config")

3

=json(file("ubc.json").read())

4

=file(A3.bufferDir/A1/".btx").export@b(data)

5

=A3.buffer|=A1

6

=file("ubc.json").write(json(A3))

7

=lock@u("config")

A3 读配置文件

A4 将新数据写入当前时刻命名的缓冲区文件

A5 将新增缓冲区文件名写入config

备注:传入数据的字段顺序按如下规则:

排序字段包含时间字段:排序字段、其它字段

排序字段不包含时间字段:排序字段、时间字段、其它字段

其它字段顺序和配置文件中的一致

zone0.splx

根据时间算0层分表号,内部使用。

输入参数:

tm 时间


A

B

1

2023

2

[27,23,18,13,7,1]

3

return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1

A1 数据的起始年份

A2 按年、月、日、时、分、秒的顺序,每个层级别右边的二进制位数

zone.splx

将低层分表号,转成高层分表号,内部使用。

输入参数:

z 低层分表号

n 高层层序号(config.level中的序号,不是层级别)

config 配置文件内容

monthCumulate 是否累积到月


A

B

1

[27,23,18,13,7,1]


2

23


3

=config.interval(n-1)


4

>p = 7 - config.level(n)


5

>p=if(monthCumulate && p==3,2,p)


6

>b = A1(p)


7

>z1 = shift(z,b)


8

>b1 = A1(p-1)-b


9

>s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1


10

=shift(shift( shift(z1,b1),-b1)+s, -b)


11

if(p>3 || n<config.level.len())

return A10

12

=and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1)


13

=and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1)


14

=shift(A10,A1(1))-8+A2


15

return A14*100000+A13*1000+A12*10


A1 按年月日时分秒的顺序,层级别右边的二进制位数

A2 数据的起始年份,只取后两位

A3 高层层区间

A4 根据高层的层级别算出其在A1中的位序号

A5 如果需要累积到月且为日层,则p设为2,表示后面截短到月

A6 高层需要截短的位数

A7 分表号截短后的值

A8 截短后最后一级使用的位数

A9 将最后一级的值整除层区间加1

A10 s替换进z中,末尾再补0

A11 如果层级别是小时及以下,或者不是最高层,直接返回A9

如果是日以上的层级别且是最高层,则继续计算短的明文分表号:

A12-A14 分别计算年月日分量,将年恢复成实际的年份

A15 将年月日分量拼起来,末位加上0

write.splx

定时执行,执行周期为混乱期的时长,用于将缓冲区中混乱期之前的数据写出到0层。


A

B

C

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=lock@u("config")

4

=elapse@s(now(),-config.choasPeriod)

5

=config.buffer

6

=A5.(config.bufferDir/~/".btx")

7

=A6.conj(file(~).import@b()).sort(${config.timeField})

8

=A7.select(${config.timeField}<=A4)

9

=[]

10

for A8.len()>0

11


=A8.${config.timeField}

12


=func(A49,B11)

13


=zone0(B11,config)

14


=A8.pselect(${config.timeField}>=B12)

15


if(B14==null)

16



>data=A8,A8=[]

17


else


18



>data=A8.to(B14-1)

19



>A8=A8.to(B14,A8.len())

20


if(zones(1).pos(B13) || config.discardZone.pos(B13))

21



next

22


else

>A7=A7\data

23


=file(config.dataDir/config.dataFilename:[B13])

24


>data=data.sort(${config.sortKey})

25


=B23.create(${config.sortKey.split@c().("#"+trim(~)).concat@c()},${config.otherFields})

26


=B25.append@i(data.cursor())

27


=B25.close()

28


>A9.insert(0,B13)

29

if(A9.len()>0)

30


=lock("zones")

31


=(zones(1)|A9).sort()

32


>zones=[B31]|zones.to(2,)

33


=file("zone.btx").export@b(zones)

34


=lock@u("zones")

35

=A5.run(movefile(config.bufferDir/~/".btx"))

36

if(A7.len()>0)

37


=long(A7.${config.timeField})

38


=file(config.bufferDir/B37/".btx").export@b(A7)

39

=lock("config")

40

>config=json(file("ubc.json").read())

41

=long(now())-config.queryPeriod*1000

42

=config.discardTime.pselect@a(~<A41)

43

=config.discardZone(A42).run(movefile(config.dataDir/~/"."/config.dataFilename))

44

>config.discardZone.delete(A42),config.discardTime.delete(A42)

45

>config.buffer=config.buffer\A5

46

if(A7.len()>0)

47


>config.buffer=config.buffer|B37

48

=file("ubc.json").write(json(config))

49

=lock@u("config")

50

=sleep(config.choasPeriod*1000)

51

goto A1

52

func

53


=[year(A52),month(A52)-1,day(A52)-1,hour(A52),minute(A52),second(A52)]

54


=7-config.level(2)

55


=config.interval(1)

56


=B53(B54)=B53(B54)\B55*B55+B55

57


>B53.run(if(#>B54,~=0) )

58


=datetime(B53(1),B53(2)+1,B53(3)+1,B53(4),B53(5),B53(6))

59


return B58








A1-A3 读配置文件

A4 计算混乱期时刻

A5 缓冲区文件名

A6-A7 读缓冲区文件,按时间字段排序

A8 过滤出混乱期时刻之前的数据

A9 用于暂存写出0层的分表号

A10 如果有满足条件需要写出的缓冲区数据

B11 获得第一条记录的时间

B12 根据B11算出其所属的1层区间的结束时间

B13 B11算出0层分表号

B14 A8中选出第一条大于等于B12的记录序号

B15-C19 选出小于B12的记录存入data变量中,A8中仅保留大于等于B12的数据

B20-C22 如果弃用列表中有待写出的目标分表号,则跳过当前组,否则将当前组数据从A7中删除,然后继续后面的写出操作

B23-B27 datasortKey排序后写入以B12为分表号的分表中

B28 B13暂存到A9序列中

A29-B34 A9写入0层分表号列表并重新排序,将分表号列表写出到文件备份

A33 删除处理过的缓冲区文件

A36-B38 将处理剩下的缓冲区数据写回缓冲区文件

A41-A44 选出config中保存的弃用时间超出最长查询周期的弃用分表号,将之从硬盘上删除,并从config中删除

A45 将处理过的缓冲区文件名从config中删除

A46 将重新写出的缓冲区文件名写回config

A52 计算当前时间所属的1层区间的结束时间

merge.splx

定期将n-1层的数据合并到n层,每次执行完都回到0层,只有低层无合并操作才往高层循环,全部合并完休眠一层的层区间后,再次执行。


A

B

C

D

E

F

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=lock@u("config")

4

>tm=elapse@s(now(),-config.choasPeriod)

5

= zone0(tm,config)

6

=config.level.len()-1

7

for A6

8


>zz = zones(A7)

9


=zone(A5,A7+1,config,false)

10


=config.lastZone(A7)

11


=lock("config")

12


>config=json(file("ubc.json").read())

13


>config.lastZone(A7)=B9

14


=file("ubc.json").write(json(config))

15


=lock@u("config")

16


if zz.len()>0 && B9>B10

17



=zz.group(zone( ~, A7+1,config,false):zu;~:zd)

18



=C17.select(zu<B9)

19



=(C17\C18).conj(zd)

20



if(config.monthCumulate && A7==A6)

21




>C18=C18.conj(zd).group(zone( ~, A7+1,config,true):zu;~:zd)

22



>zus = zones(A7+1)

23



=[]

=[]

24



for C18

25




=zus.select@1(xor(~,C24.zu)<=1)

26




=if(D25,xor(D25,1),C24.zu)

27




if(config.discardZone.pos(D26))

28





>C19.insert(0,C24.zd)

29





next C24

30




=file(config.dataDir/config.dataFilename:(D25|C24.zd))

31




=file(config.dataDir/config.dataFilename:D26)

32




=D30.reset(D31)

33




=lock("config")

34




>config=json(file("ubc.json").read())

35




>config.discardZone.insert(0,(D25|C24.zd))

36




>config.discardTime.insert(0,[long(now())]*(D25|C24.zd).len())

37




=file("ubc.json").write(json(config))

38




=lock@u("config")

39




>C23=C23|D25,D23=D23|D26

40



=C19.sort()

41



=((zones(A7+1)\C23)|D23).sort()

42



=lock("zones")

43



>zones=zones.(~).modify(A7,[C40,C41])

44



=file("zone.btx").export@b(zones)

45



=lock@u("zones")

46



goto A1

47

=config.interval(1)

48

=sleep(case(config.level(2),2:A47*60,3:A47*3600,4:A47*3600*24;A47)*1000)

49

goto A1

A1-A3 读配置文件

A4 混乱期时刻

A5 混乱期时刻在0层的分表号

A7 0层开始循环

B8 n-1层的分表号列表

B9 A5算出其在n层的分表号

B10 n层的最后分表号

B11-B15 更新n层的最后分表号

B16 如果n-1层有分表,且混乱期时刻算出的n层分表号大于n层的最后分表号

C17 n-1层分表号算出其在n层的分表号,并按其分组

C18 过滤出n层分表号小于B9的组(表示这些组的所有分表已经准备完毕)

C19 尚未准备完毕的组(不参与归并,同时把后面跳过的组也暂存到此变量)

C20 如果需要月累积且当前是往最高层归并

D21 C18重新用月分表号分组

C22 zones中读出n层已有的分表号,记为zus

C23 暂存需要弃用的分表号

D23 暂存新增的分表号

C24 按组循环

D25 zus中读取当前组所属的分表号(含末位01两种)

D26 算出新分表号:如果当前所属分表号存在,则新分表号交替(原末位1则新末位0;原末位0则新末位1),否则就用当前组新算出的分表号

D27-D29 如果config中弃用分表号包含新分表号,则跳过当前组,且把当前组分表号暂存到C19

D30-D32 把当前组的分表数据合并原分表数据,一起写入新分表中

D33-D38 将写出完毕的分表号写入config的弃用分表号列表中,并记录弃用时间

D39 将弃用分表号和新增分表号分别暂存到C23D23

C40-C45 更新分表号列表,并备份到文件

query.splx

查询数据时使用,返回复组表分表号

输入参数:

start:起始时间

end:结束时间


A

B

1

>z=zones.m(:-2).rvs().conj()

2

>sz=zone0(start)

3

>ez=zone0(end)

4

>sp = z.pselect@z(~<=sz)

5

>ep = ifn(z.pselect( ~>ez), z.len()+1)

6

=if (sp >= ep,null,z.to( ifn(sp,1), ep-1 ))

7

if sp!=null

return A6

8

>z=zones.m(-1)

9

>sz=((year(start)%100)*10000+month(start)*100+day(start))*10+1

10

>ez=((year(end)%100)*10000+month(end)*100+day(end))*10+1

11

>sp = ifn(z.pselect@z( ~<=sz), 1 )

12

>ep = ifn(z.pselect( ~>ez), z.len()+1)

13

=if (sp >= ep,null,z.to( sp, ep-1))

14

return A13|A6

A1 将除了最高层以外的所有分表号按时间顺序排列

A2-A3 算出输入参数start\end对应的0层分表号(由于0层分表号的末位本身是1,所以这里不用再加1了)

A4 从后往前找出第一个早于等于起始时间的分表号位置

A5 从前往后找出第一个晚于结束时间的分表号位置

A6 选出位于起止时间之间的分表号,左闭右开;没有则返回null

A7 如果sp不是null,说明查询区间涵盖在低层分表号内,不必查最高层,直接返回A6

A8 最高层的分表号

A9-A10 算出输入参数start\end对应的高层分表号,考虑到交替位的问题,sz的末位置1可以方便写出查找代码

A11 从后往前找出第一个早于等于起始时间的分表号位置

A12 从前往后找出第一个晚于结束时间的分表号位置

A13 选出位于起止时间之间的分表号,左闭右开;没有则返回null

A14 将高层的查询结果和低层的查询结果合并后返回

说明:使用复组表后必须关闭,否则merge.splx做完合并操作后无法删除分表


例程代码下载:[code.zip]

以下是广告时间

对润乾产品感兴趣的小伙伴,一定要知道软件还能这样卖哟性价比还不过瘾? 欢迎加入好多乾计划。
这里可以低价购买软件产品,让已经亲民的价格更加便宜!
这里可以销售产品获取佣金,赚满钱包成为土豪不再是梦!
这里还可以推荐分享抢红包,每次都是好几块钱的巨款哟!
来吧,现在就加入,拿起手机扫码,开始乾包之旅



嗯,还不太了解好多乾?
猛戳这里
玩转好多乾