实时追加例程—用内存避免小分表

实时追加例程—用内存避免小分表

背景与方法

参考下面文章

实时追加例程 - 乾学院 (raqsoft.com.cn)

该文章中采用多层分表的方式存储实时数据,对于数据追加频繁的场景,为了保证新数据能快速及时写出,采用了较短时间区间的小分表,这会导致文件比较碎,管理比较麻烦。

本例程在上述方法的基础上,再设计了优化方法来存储近期的较少量的数据,即把新数据存储在内存序表中,定期写入时间区间较长的分表,查询的时候,利用组表的append@y()函数,把内存序表模拟成复组表的分表。这种方式的优点是减少较碎的小分表,文件数量少,管理方便,不需要缓冲区,查询时可以返回所有数据,如果查询结果包含近期数据,比分表方式更快一些。

实际使用的时候,可以根据机器性能和数据量及追加频度来选择近期多长时间内的数据驻留在内存。

本文用到的大部分概念,均和上述文章相同,因此只针对不同的部分进行讲解。

测试对比

总数据量:2264万行,4个字段

其中近期数据量:400万行,时间区间一小时

采用分表存储的方法,把近期一小时内的400万行数据全部留在0层,不做归并

采用内存序表的方法,把近期一小时内的400万行数据全部留在内存序表,不写入分表

测试方法分两种:只读最近一小时的,和全读。反复多次再取平均,对比时间。

测试环境:

处理器:11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz 2.30 GHz

机身内存:16.0 GB

Java使用的内存:9672m

单位:毫秒



分表存储

内存序表

全读

第一次

6802

5098

第二次

6586

5040

第三次

6530

4983

平均

6639

5040

只读最近一小时

第一次

1106

119

第二次

1122

135

第三次

1217

156

平均

1148

137

配置文件说明

配置文件ubc.json

该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:

[{ "sortKey":"account,tdate",

"timeField":"tdate",

"otherFields":"ColName1,ColName2",

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":2,

"level":[3,4],

"blockSize":[524288,1048576],

"interval":[1,1],

"lastZone":[1167851520,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

"level":层级别,取值1,2,3,4分别代表秒、分、时、天,这里只配置了34,表示秒、分这两个级别的数据都留在内存序表,从小时这个层级开始才存储在分表。

存储结构

"dataDir"目录下多了一个backup.btx文件,用于备份内存序表的数据,始终和内存序表保持一致。

配置及存储举例

1. 电商系统

用户数在100-1000万规模,每天记录数在2400万行,每小时100万行。这里把最新1小时以内的数据驻留在内存序表。

其配置文件如下:

[{ "sortKey":"uid,etime",

"timeField":"etime",

"otherFields":"eventID,eventType",

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":600,

"level":[3,4],

"blockSize":[524288,1048576],

"interval":[1,1],

"lastZone":[null,null],

"monthCumulate":true,

"discardZone":[],

"discardTime":[]

}]

2. 信号系统

其特点是产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB甚至更多的数据量)。设备号或者测点名在1-10万规模,记录数每秒追加一次,一天的数据量大约在17亿行左右。

这个例子数据量非常大,每分钟的数据量在100多万行,我们把1层的区间定义为1分钟(最新1分钟以内的数据驻留在内存序表)2层的区间定义为2小时,3层的区间定义为1天。由于信号系统数据量太大,且查询时段通常不超过24小时,所以月累积设为false

其配置文件如下:

[{ "sortKey":"TagName",

"timeField":"time",

"otherFields":"Type,Quality,Value",

"dataDir":"data/",

"dataFilename":"data.ctx",

"queryPeriod":10,

"level":[2,3,4],

"blockSize":[262144,524288,1048576],

"interval":[1, 2,1],

"lastZone":[null,null,null],

"monthCumulate":false,

"discardZone":[],

"discardTime":[]

}]

全局变量

增加全局变量cache:存储内存序表

增加” cache”锁:对cache改写时加锁

代码解析

init.splx

仅在服务器启动时执行一次,过程中不再执行,如果服务器是第一次启动,则需要初始化参数,如果是第n次启动,则需要把第n-1次执行结束时写出的配置信息读入


A

B

1

=json(file("ubc.json").read())

2

=file("zone.btx")

3

if(A2.exists())

=A2.import@bi()

4

else

>B3=A1.level.len().([])

5

=env(zones,B3)

6

=file(A1.dataDir/"backup.btx")

7

if(A6.exists())

=A6.import@b().sort(${A1.sortKey})

8

else

>B7=[]

9

=file(A1.dataDir/A1.dataFilename:0)

10

if(!A9.exists())

=A9.create(${A1.sortKey.split@c().("#"+trim(~)).concat@c()},${A1.otherFields};;A1.blockSize(1))

11


=B10.close()

12

=env(cache,B7)

13

=register("zone","zone.splx")

14

=register("zone0","zone0.splx")

15

=call@n("merge.splx")

A1 读配置文件

A2 读分表号存储文件

A3-B4 如果分表号文件存在,则读出,否则构造一个长度为层数的由空序列组成的序列

A5 将分表号序列设为全局变量zones

A6 内存序表的备份文件

A7-B8 如果内存序表的备份文件存在,则读入并按排序字段排序;不存在则产生一个空的内存序列

A9-B11 产生一个分表号为0的空分表,用于查询时如果只有内存序表,可以与之归并后返回一个组表对象,保证返回接口统一

A12 将内存序表存为全局变量cache

A13 zone.splx脚本登记为同名函数

A14 zone0.splx脚本登记为同名函数(虽然没有0层了,但是所有分表号的计算仍旧以0层分表号为基准,所以仍旧保留了这个函数)

A15 启动合并线程

append.splx

用于每次收到新数据时, 将新数据写出到缓冲区,输入参数data为新数据序表,写出后将新增的缓冲区文件名写入config


A

1

=lock("config")

2

=json(file("ubc.json").read())

3

=lock@u("config")

4

>data=data.sort(${A2.sortKey})

5

=lock("cache")

6

=file(A2.dataDir/"backup.btx").export@ab(data)

7

>cache=[cache,data].merge(${A2.sortKey})

8

=lock@u("cache")

A2 读配置文件

A4 将新数据按排序字段排序

A6 将新数据写入内存备份文件

A7 将新数据按排序字段归并入内存序表

备注:传入数据的字段顺序按如下规则:

排序字段包含时间字段:排序字段、其它字段

排序字段不包含时间字段:排序字段、时间字段、其它字段

其它字段顺序和配置文件中的一致

zone0.splx

根据时间算0层分表号,内部使用。

输入参数:

tm 时间


A

B

1

2023

2

[27,23,18,13,7,1]

3

return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1

A1 数据的起始年份

A2 按年、月、日、时、分、秒的顺序,每个层级别右边的二进制位数

zone.splx

将低层分表号,转成高层分表号,内部使用。

输入参数:

z 低层分表号

n 高层层序号(config.level中的序号,不是层级别)

config 配置文件内容

monthCumulate 是否累积到月


A

B

1

[27,23,18,13,7,1]


2

23


3

=config.interval(n-1)


4

>p = 7 - config.level(n)


5

>p=if(monthCumulate && p==3,2,p)


6

>b = A1(p)


7

>z1 = shift(z,b)


8

>b1 = A1(p-1)-b


9

>s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1


10

=shift(shift( shift(z1,b1),-b1)+s, -b)


11

if(p>3 || n<config.level.len())

return A10

12

=and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1)


13

=and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1)


14

=shift(A10,A1(1))-8+A2


15

return A14*100000+A13*1000+A12*10


A1 按年月日时分秒的顺序,层级别右边的二进制位数

A2 数据的起始年份,只取后两位

A3 高层层区间

A4 根据高层的层级别算出其在A1中的位序号

A5 如果需要累积到月且为日层,则p设为2,表示后面截短到月

A6 高层需要截短的位数

A7 分表号截短后的值

A8 截短后最后一级使用的位数

A9 将最后一级的值整除层区间加1

A10 s替换进z中,末尾再补0

A11 如果层级别是小时及以下,或者不是最高层,直接返回A9

如果是日以上的层级别且是最高层,则继续计算短的明文分表号:

A12-A14 分别计算年月日分量,将年恢复成实际的年份

A15 将年月日分量拼起来,末位加上0

write.splx

merge调用执行,merge每次执行前,先调用write.splx,把内存序表的数据写出到1层,然后再往高层归并。


A

B

C

1

=lock("config")

2

>config=json(file("ubc.json").read())

3

=long(now())-config.queryPeriod*1000

4

=config.discardTime.pselect@a(~<A3)

5

=config.discardZone(A4).(movefile(config.dataDir/~/"."/config.dataFilename))

6

>config.discardZone.delete(A4),config.discardTime.delete(A4)

7

=file("ubc.json").write(json(config))

8

=lock@u("config")

9

=func(A58,now())

10

=cache.sort(${config.timeField})

11

=A10.select@b(${config.timeField}<=A9)

12

=[]

=[]

=[]

13

>zus = zones(1)

14

for A11.len()>0

15


=A11.${config.timeField}

16


=func(A58,B15,"end")

17


=zone0(B15,config)

18


=zone(B17, 1,config,false)

19


=A11.pselect(${config.timeField}>=B16)

20


if(B19==null)

21



>data=A11,A11=[]

22


else

23



>data=A11.to(B19-1)

24



>A11=A11.to(B19,A11.len())

25


=zus.select@1(xor(~,B18)<=1)

26


=if(B25,xor(B25,1),B18)

27


=lock("config")

28


>config=json(file("ubc.json").read())

29


if((B25!=null && config.level(1)==1)|| config.discardZone.pos(B26))

30



goto A14

31


else

>C12=C12|data

32


=lock@u("config")

33


=file(config.dataDir/config.dataFilename:B26)

34


>data=data.sort(${config.sortKey})

35


if(B25!=null)

=file(config.dataDir/config.dataFilename:B25)

36



=C35.reset(B33:config.blockSize(1);data)

37



>A12.insert(0,B25)

38


else

=B33.create(${config.sortKey.split@c().("#"+trim(~)).concat@c()},${config.otherFields};;config.blockSize(1))

39



=C38.append@i(data.cursor())

40



=C38.close()

41


>B12.insert(0,B26)

42

if(A12.len()>0)



43


=lock("config")

44


>config=json(file("ubc.json").read())

45


>config.discardZone.insert(0,A12)

46


>config.discardTime.insert(0,[long(now())]*A12.len())

47


=lock@u("config")

48

if(A12.len()>0 || B12.len()>0)

49


=lock("zones")

50


=((zones(1)\A12)|B12).sort()

51


>zones=[B50]|zones.to(2,)

52


=file("zone.btx").export@b(zones)

53


=lock@u("zones")

54

if(C12.len()>0)

=lock("cache")

55


>cache=cache\C12

56


=file(config.dataDir/"backup.btx").export@b(cache)

57


=lock@u("cache")

58

func



59


=[year(A58),month(A58)-1,day(A58)-1,hour(A58),minute(A58),second(A58)]

60


=7-config.level(1)

61


=config.interval(1)

62


>B59(B60)=B59(B60)\B61*B61

63


if(B58=="end")

>B59(B60)=B59(B60)+B61

64


>B59.run(if(#>B60,~=0) )

65


=datetime(B59(1),B59(2)+1,B59(3)+1,B59(4),B59(5),B59(6))

66


return B65

A2 读配置文件

A3-A6 将弃用时长满足最长查询周期的分表删除

A9 当前时间在1层所属区间的起始时间,此时间之前的数据才归并

A10 将内存序表的数据按时间排序

A11 过滤出A9之前的数据

A12 要删的分表号 B12 要加的分表号 C12 已写出的内存数据

A13 1层已有的分表号

A14 如果有满足条件需要写出的内存数据

B15 获得第一条记录的时间

B16 根据B15算出其所属的1层区间的结束时间

B17 B15算出0层分表号

B18 B17算出1层分表号

B19 A11中选出第一条大于等于B16的记录序号

B20-C24 选出小于B16的记录存入data变量中,A11中仅保留大于等于B16的数据

B25 目标分表号在1层中是否存在

B26 计算新的目标分表号

B27-C32 如果1层是秒层且目标分表号已存在,或者弃用列表中有待写出的目标分表号,则跳过当前组,否则将当前组数据存入C12,然后继续后面的写出操作

B33-C40 datasortKey排序后,和原目标分表归并,再写入以B26为分表号的分表中

B41 B26暂存到B12序列中

A42-B47 将要删除的弃用分表号列表A12写入配置文件的弃用列表中

A48-B53 将弃用分表号从全局变量zones中删除,新增分表号写入全局变量

A54-B57 将已写出的数据从内存序表中删除,并同步备份文件

A58 计算当前时间所属的1层区间的起始/结束时间,第二个参数为”end”,则计算结束时间,否则计算起始时间

merge.splx

定期将n-1层的数据合并到n层,每次执行完都回到1层,重新执行write,只有低层无合并操作才往高层循环,全部合并完休眠一层的层区间后,再次执行。


A

B

C

D

E

1

=call("write.splx")

2

=lock("config")

3

>config=json(file("ubc.json").read())

4

=lock@u("config")

5

= zone0(now(),config )

6

=config.level.len()-1

7

for A6

8


>zz = zones(A7)

9


=zone(A5,A7+1,config,false)

10


=config.lastZone(A7)

11


=lock("config")

12


>config=json(file("ubc.json").read())

13


>config.lastZone(A7)=B9

14


=file("ubc.json").write(json(config))

15


=lock@u("config")

16


if zz.len()>0 && B9>B10

17



=zz.group(zone( ~, A7+1,config,false):zu;~:zd)

18



=C17.select(zu<B9)

19



if(config.monthCumulate && A7==A6)

20




>C18=C18.conj(zd).group(zone( ~, A7+1,config,true):zu;~:zd)

21



>zus = zones(A7+1)

22



=[]

=[]

=[]

23



for C18



24




=zus.select@1(xor(~,C23.zu)<=1)

25




=if(D24,xor(D24,1),C23.zu)

26




=lock("config")

27




>config=json(file("ubc.json").read())

28




if(config.discardZone.pos(D25))

29





next C23

30




else

>E22.insert(0,C23.zd)

31




=lock@u("config")

32




=file(config.dataDir/config.dataFilename:D25)

33




=file(config.dataDir/config.dataFilename:(D24|C23.zd))

34




=D33.reset(D32:config.blockSize(#A7+1))

35




=lock("config")

36




>config=json(file("ubc.json").read())

37




>config.discardZone.insert(0,D24|C23.zd)

38




>config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len())

39




=file("ubc.json").write(json(config))

40




=lock@u("config")

41




>C22=C22|D24,D22=D22|D25

42



=lock("zones")

43



=zones(A7)\E22

44



=((zones(A7+1)\C22)|D22).sort()

45



>zones=zones.(~).modify(A7,[C43,C44])

46



=file("zone.btx").export@b(zones)

47



=lock@u("zones")

48



if E22.len()>0

goto A1

49

=config.interval(1)

50

=sleep(case(config.level(1),2:A49*60,3:A49*3600,4:A49*3600*24;A49)*1000)

51

goto A1

A1 执行写出线程,把内存序表数据写出到1

A2-A4 读配置文件

A5 当前时刻在0层的分表号

A7 1层开始循环

B8 n-1层的分表号列表

B9 A5算出其在n层的分表号

B10 n层的最后分表号

B11-B15 更新n层的最后分表号

B16 如果n-1层有分表,且当前时刻算出的n层分表号大于n层的最后分表号

C17 n-1层分表号算出其在n层的分表号,并按其分组

C18 过滤出n层分表号小于B9的组(表示这些组的所有分表已经准备完毕)

C19 如果需要月累积且当前是往最高层归并

D20 C18重新用月分表号分组

C21 zones中读出n层已有的分表号,记为zus

C22 待弃用分表号 D22 新增分表号 E22 归并完毕的分表号

C23 按组循环

D24 zus中读取当前组所属的分表号(含末位01两种)

D25 算出新分表号:如果当前所属分表号存在,则新分表号交替(原末位1则新末位0;原末位0则新末位1),否则就用当前组新算出的分表号

D26-D31 如果config中弃用分表号包含新分表号,则跳过当前组,否则把当前组分表号暂存到E22

D32-D34 把当前组的分表数据合并原分表数据,一起写入新分表中

D35-D40 将写出完毕的分表号写入config的弃用分表号列表中,并记录弃用时间

D41 将弃用分表号和新增分表号分别暂存到C22D22

C42-C47 更新分表号列表,并备份到文件

query.splx

查询数据时使用,将选出的分表和内存序表归并后产生游标返回。

输入参数:

start:起始时间

end:结束时间


A

B

1

>z=zones.m(:-2).rvs().conj()

2

>sz=zone0(start)

3

>ez=zone0(end)

4

>sp = z.pselect@z(~<=sz)

5

>ep = ifn(z.pselect( ~>ez), z.len()+1)

6

=if (sp >= ep,null,z.to( ifn(sp,1), ep-1 ))

7

=if(!sp && cache.len()>0,true,false)

8

=lock("config")

9

>config=json(file("ubc.json").read())

10

=lock@u("config")

11

>z=zones.m(-1)

12

>sz=(year(start)%100)*100000+month(start)*1000+if(!config.monthCumulate,day(start)*10,0)

13

>ez=(year(end)%100)*100000+month(end)*1000+if(!config.monthCumulate,day(end)*10,0)+1

14

>sp = ifn(z.pselect@z( ~<sz), 0 )

15

>ep = ifn(z.pselect( ~>ez), z.len()+1)

16

=if (sp >= z.len(),null,z.to(sp+1, ep-1))

17

=if(A16.len()>0 || A6.len()>0,A16|A6,0)

18

=file(config.dataDir/config.dataFilename:A17).open()

19

if(A7)

return A18.append@y(cache)

20

else

return A18

A1 将除了最高层以外的所有分表号按时间顺序排列

A2-A3 算出输入参数start\end对应的0层分表号(由于0层分表号的末位本身是1,所以这里不用再加1了)

A4 从后往前找出第一个早于等于起始时间的分表号位置

A5 从前往后找出第一个晚于结束时间的分表号位置

A6 选出位于起止时间之间的分表号,左闭右开;没有则返回null

A7 如果查询起始时间早于第一个分表且内存序表有数据,则表示需要输出内存序表

A8-A10 读配置文件

A11 最高层的分表号

A12-A13 算出输入参数start\end对应的高层分表号,考虑到交替位的问题,sz的末位置0ez的末位置1,可以方便写出查找代码

A14 从后往前找出第一个早于起始时间的分表号位置

A15 从前往后找出第一个晚于结束时间的分表号位置

A16 选出位于起止时间之间的分表号;没有则返回null

A17 将高层的查询结果和低层的查询结果合并

A18-B20 如果查询结果包含内存序表,则和组表对象归并后返回;否则直接返回组表对象