复组表定期维护例程
背景与方法
本例程适应场景:数据维护的实时性不高,可以在指定时刻定期执行,一般数小时或天为单位;总数据量很大,需要拆成多个分表存储;支持追加和更新两种模式。每次数据维护量可能较大,可以使用游标传入。
方法:
追加模式:传入的数据要求已经按时间字段有序,将接收到的数据按当前分表的时间区间拆成多个临时分表存储,然后将临时分表和对应的当前分表归并,写入新的分表。归并结束后,将当前分表指向新产生的分表,对外提供查询服务。原当前分表放入弃用列表,下次更新前删除。本例程支持数据补漏,传入之前遗漏的早期数据也可以正确插入到合适的分表中。
查询时根据查询时间参数,选出涵盖查询区间的分表号列表返回。
更新模式:传入的数据要求按组表的主键字段有序,将接收到的数据写入以当前时刻为标识的临时分表,然后将临时分表和对应时间段的当前分表归并,写入新的分表。其它动作和追加模式相同。
更新模式有个特殊的处理:设计了一个主分表用于存储很早期的历史数据,并提供一个手动调用模块,将某个时刻之前的分表归并入主分表。
查询时,返回主分表号和所有当前分表号列表。
约定与概念
1. 获得新数据时立即写入临时分表。
2. 写出数据采用复组表格式,由多个分表构成,每个分表一个时间区间内的数据。
3. 每隔一个时间区间会启用一个新的分表,一个分表对应一个时间区间。分表的时间区间目前仅支持 1月、1日两种,分别用字母m和d代表。
4. 写出线程:由外部程序主动调用本线程,以游标形式传入数据,先将数据写到临时分表;然后将临时分表归并到相应的当前分表。如为追加模式,要求传入的数据按时间字段有序;如为更新模式,则传入的数据按主键字段有序。
5. 手动归并线程:如为更新模式,将提供一个手动归并的程序,支持将指定时刻之前生成的分表归并到主分表。
6. 查询线程:如为追加模式,则根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。如为更新模式,则返回主分表和所有的分表号。
7. 分表号的定义规则:如时间级别为月则yyMM00k 的格式;如为日则yyMMddk 的格式,其中k为交替位,取值为0或1
8. 交替位:如果目标分表存在,则和归并上来的数据一起归并写入新的分表,新的分表号和旧的分表号最末二进制位为0/1交替,即原目标分表号末位是0,则新分表号为1,反之为0,前面的所有位的值和原目标分表号一致。
配置文件说明
配置文件ubc.json
该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:
[{ "addOnly":true, "sortKey":"account,tdate", "timeField":"tdate", "deleteField":null, "otherFields":"ColName1,ColName2", "dataDir":"data/", "dataFilename":"data.ctx", "interval":"d", "currentZone":0, "backupZone":1, "discardZone":[] }] |
"addOnly": 是否仅追加,取值true/false。
"sortKey": 如为追加模式,则为排序字段名,一般要包含时间字段。如为更新模式,则为主键字段名,同时要求数据按主键字段排好序。多个英文逗号分隔。
"timeField":时间字段名,如果为追加模式,必须配置此字段,更新模式则置为null。
"deleteField": 删除标识字段,如果更新模式,需要删除则配上此字段,不需要删除就配置null
"otherFields":其它字段名,英文逗号分隔。
"dataDir":组表存储路径,相对于主目录。
"dataFilename":组表文件名,如"data.ctx"。
"interval":分表的时间区间,m表示1月,d表示1日。
"currentZone": 正在使用的主分表号,如0。如为追加模式,则不用配置。
"backupZone": 备份的主分表号,如1。如为追加模式,则不用配置。
"discardZone":被弃用的分表号列表,内容由代码自动产生,初始配置为[]。
存储结构
主目录下的文件及子目录:
data:组表存储路径,目录名在ubc.json中配置,参见前面介绍
ubc.json:配置文件
data目录下的文件如下所示:
文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由系统根据层级别自动计算,分表号的计算规则参见第2节介绍。
配置及存储举例
电商系统:
用户数在50-100万规模,每天记录数在50万-100万行。
这个例子数据量较小,时间区间设置为1月。
其配置文件如下:
[{ "sortKey":"uid,etime",
"timeField":"etime",
"otherFields":"eventID,eventType",
"dataDir":"data/",
"dataFilename":"data.ctx",
"interval":"m",
"currentZone":0,
"backupZone":1,
"addOnly":true,
"discardZone":[],
}]
全局变量
zones: 分表号列表。
current: 当前主分表号,更新模式才有此变量。
代码解析
init.splx
仅在服务器启动时执行一次,过程中不再执行,如果服务器是第一次启动,则需要初始化参数。
A |
B |
C |
|
1 |
>config=json(file("ubc.json").read()) |
||
2 |
if(!config.addOnly) |
||
3 |
=file(config.dataDir/config.dataFilename:config.currentZone) |
||
4 |
if(!B3.exists()) |
||
5 |
=config.sortKey.split@c().("#"+trim(~)).concat@c() |
||
6 |
=if(config.deleteField, B3.create@d(${C5},${config.deleteField},${config.otherFields}), B3.create(${C5},${config.otherFields}) ) |
||
7 |
=C6.close() |
||
8 |
>env(current,config.currentZone) |
||
9 |
=file("zone.btx") |
||
10 |
if(A9.exists()) |
=A9.import@bi() |
|
11 |
else |
>B10=[] |
|
12 |
=env(zones,B10) |
A1 读分表号存储文件
A2 如果是更新模式
B4 如果当前主分表不存在
C5-C7 构造一个空的主分表
B8 将当前主分表号设置为全局变量
A9-B11 如果分表号文件存在,则读出,否则构造一个空序列
A12 将分表号序列设为全局变量zones
write.splx
被外部程序调用执行,传入游标数据,先将数据写出到临时分表,再归并入正式分表。如为追加模式,要求传入的游标数据按时间字段排好序;如为更新模式,则传入的游标数据按主键字段排好序。
A |
B |
C |
D |
|
1 |
>config=json(file("ubc.json").read()) |
|||
2 |
=config.discardZone.run(movefile(config.dataDir/~/"."/config.dataFilename)) |
|||
3 |
>config.discardZone=[] |
|||
4 |
=[] |
|||
5 |
=config.sortKey.split@c().("#"+trim(~)).concat@c() |
|||
6 |
if(!config.addOnly) |
|||
7 |
=(year(now())%100)*10000+month(now())*100+day(now()) |
|||
8 |
=file(config.dataDir/config.dataFilename:B7) |
|||
9 |
=if(config.deleteField, B8.create@d(${A5},${config.deleteField},${config.otherFields}), B8.create(${A5},${config.otherFields}) ) |
|||
10 |
=B9.append(cs) |
|||
11 |
=B9.close() |
|||
12 |
>A4.insert(0,B7) |
|||
13 |
else |
for |
||
14 |
=cs.fetch@0(1).${config.timeField} |
|||
15 |
if(!C14) |
break B13 |
||
16 |
=func(A40,C14) |
|||
17 |
=(year(C14)%100)*10000+month(C14)*100+day(C14) |
|||
18 |
>data=cs.fetch(;${config.timeField}>=C16) |
|||
19 |
if(!data) |
break B13 |
||
20 |
=file(config.dataDir/config.dataFilename:[C17]) |
|||
21 |
>data=data.sort(${config.sortKey}) |
|||
22 |
=C20.create(${A5},${config.otherFields}) |
|||
23 |
=C22.append@i(data.cursor()) |
|||
24 |
=C22.close() |
|||
25 |
>A4.insert(0,C17) |
|||
26 |
=A4.group(if(config.interval=="m",(~\100)*1000,~*10):zu;~:zd) |
|||
27 |
=[] |
=[] |
||
28 |
for A26 |
|||
29 |
=zones.select@1(xor(~,A28.zu)<=1) |
|||
30 |
=if(B29,xor(B29,1),A28.zu) |
|||
31 |
=file(config.dataDir/config.dataFilename:(B29|A28.zd)) |
|||
32 |
=file(config.dataDir/config.dataFilename:B30) |
|||
33 |
=B31.reset${if(config.addOnly,"","@w")}(B32) |
|||
34 |
>A28.zd.run(movefile(config.dataDir/~/"."/config.dataFilename)) |
|||
35 |
>config.discardZone.insert(0,B29) |
|||
36 |
>A27=A27|B29,B27=B27|B30 |
|||
37 |
=file("ubc.json").write(json(config)) |
|||
38 |
>zones=((zones\A27)|B27).sort() |
|||
39 |
=file("zone.btx").export@b(zones) |
|||
40 |
func |
|||
41 |
=[year(A40),month(A40)-1,day(A40)-1,hour(A40),minute(A40),second(A40)] |
|||
42 |
=if(config.interval=="m",2,3) |
|||
43 |
=B41(B42)=B41(B42)+1 |
|||
44 |
>B41.run(if(#>B42,~=0) ) |
|||
45 |
=datetime(B41(1),B41(2)+1,B41(3)+1,B41(4),B41(5),B41(6)) |
|||
46 |
return B45 |
A1-A3 读配置文件,删除弃用分表
A4 用来存储临时分表号
A6 如果是更新模式
B7 用当前时刻算出临时分表号
B8-B11 将游标数据写入临时分表
B12 记录临时分表
A13 如果是追加模式
B13 循环
C14 获得第一条记录的时间
C15 如果读不到数据则结束循环
C16 根据第一条记录的时间算出其所属的分表的结束时间
C17 用第一条记录的时间算出临时分表号
C18 从游标中选出小于C16的记录存入data变量中
C20-C24 将data按sortKey排序后写入以C17为分表号的临时分表中
C25 将C17暂存到A4序列中
A26 将临时分表号按其所属的正式分表号分组
A28 按组循环
B29 从正式分表号列表中读出当前组所属的分表号
B30 如果B29存在,则使用交替尾号作为新的分表号
B31-B33 将临时分表和原目标分表(如果存在)归并后写入新的交替分表号中
B34 删除临时分表
B35 原分表号B29(如果存在)写入弃用分表号列表
B36 记录原分表号和新分表号
A38 从分表号列表中删除原分表号,增加新分表号
A40 计算传入时间所属的分表区间的结束时间
manualMerge
手动调用此程序,将分表归并入主分表号,仅适用于更新模式
输入参数:
t 将此时刻之前产生的分表归并入主分表
A |
B |
|
1 |
>config=json(file("ubc.json").read()) |
|
2 |
if t!=null |
|
3 |
=(year(t)%100)*100000+month(t)*1000+day(t)*10 |
|
4 |
=zones.select(~<B3) |
|
5 |
else |
>B4=zones |
6 |
=file(config.dataDir/config.dataFilename:(config.currentZone|B4)) |
|
7 |
=file(config.dataDir/config.dataFilename:config.backupZone) |
|
8 |
=A6.reset@w(A7) |
|
9 |
>config=json(file("ubc.json").read()) |
|
10 |
>config.discardZone.insert(0,(config.currentZone|B4)) |
|
11 |
>config.currentZone=config.backupZone,config.backupZone=current |
|
12 |
>current=config.currentZone |
|
13 |
=file("ubc.json").write(json(config)) |
|
14 |
>zones=zones\B4 |
|
15 |
=file("zone.btx").export@b(zones) |
A2-B5 如果t不为空,则选出t时刻之前完成的分表号,否则选出全部分表号
A6-A8 把当前主分表和选出的分表归并后写入备份主分表
A10 把当前主分表号和选出的分表号写入弃用列表
A11-A12 将当前主分表号和备份分表号对调
A14-A15 更新分表号列表,并把之备份至文件
query.splx
查询数据时使用,返回复组表分表号
追加模式的输入参数:
start:起始时间
end:结束时间
更新模式无输入参数
A |
B |
|
1 |
if ifv(current) |
return current|zones |
2 |
>sz=((year(start)%100)*10000+month(start)*100+day(start))*10+1 |
|
3 |
>ez=((year(end)%100)*10000+month(end)*100+day(end))*10+1 |
|
4 |
>sp = ifn(zones.pselect@z( ~<=sz), 1 ) |
|
5 |
>ep = ifn(zones.pselect( ~>ez), zones.len()+1) |
|
6 |
=if (sp >= ep,null,zones.to( sp, ep-1)) |
|
7 |
return A6 |
A1 如果存在current,说明是更新模式,则返回所有分表号
A2-A3 算出输入参数start\end所属的分表号,考虑到交替位的问题,末位置1可以方便写出查找代码
A4 从后往前找出第一个早于等于起始时间的分表号位置
A5 从前往后找出第一个晚于结束时间的分表号位置
A6 选出位于起止时间之间的分表号,左闭右开;没有则返回null
说明:
1、 使用复组表后必须关闭,否则归并操作后无法删除分表
2、 如为更新模式,则使用复组表产生游标时,必须使用@w选项,即cursor@w(),表示分表之间采用更新机制归并
英文版