复组表定期维护例程
背景与方法
本例程适应场景:数据维护的实时性不高,可以在指定时刻定期执行,一般数小时或天为单位;总数据量很大,需要拆成多个分表存储;支持追加和更新两种模式。每次数据维护量可能较大,可以使用游标传入。
方法:
追加模式:传入的数据要求已经按时间字段有序,将接收到的数据按当前分表的时间区间拆成多个临时分表存储,然后将临时分表和对应的当前分表归并,写入新的分表。归并结束后,将当前分表指向新产生的分表,对外提供查询服务。原当前分表放入弃用列表,下次更新前删除。本例程支持数据补漏,传入之前遗漏的早期数据也可以正确插入到合适的分表中。
查询时根据查询时间参数,选出涵盖查询区间的分表号列表返回。
更新模式:传入的数据要求按组表的主键字段有序,将接收到的数据写入以当前时刻为标识的临时分表,然后将临时分表和对应时间段的当前分表归并,写入新的分表。其它动作和追加模式相同。
更新模式有个特殊的处理:设计了一个主分表用于存储很早期的历史数据,并提供一个手动调用模块,将某个时刻之前的分表归并入主分表。
查询时,返回主分表号和所有当前分表号列表。
约定与概念
1. 获得新数据时立即写入临时分表。
2. 写出数据采用复组表格式,由多个分表构成,每个分表一个时间区间内的数据。
3. 每隔一个时间区间会启用一个新的分表,一个分表对应一个时间区间。分表的时间区间目前仅支持 1月、1日两种,分别用字母m和d代表。
4. 写出线程:由外部程序主动调用本线程,以游标形式传入数据,先将数据写到临时分表;然后将临时分表归并到相应的当前分表。如为追加模式,要求传入的数据按时间字段有序;如为更新模式,则传入的数据按主键字段有序。
5. 手动归并线程:如为更新模式,将提供一个手动归并的程序,支持将指定时刻之前生成的分表归并到主分表。
6. 查询线程:如为追加模式,则根据时间段参数,选出符合时间段的分表号返回,以便生成复组表用于查询。如为更新模式,则返回主分表和所有的分表号。
7. 分表号的定义规则:如时间级别为月则yyMM00k 的格式;如为日则yyMMddk 的格式,其中k为交替位,取值为0或1
8. 交替位:如果目标分表存在,则和归并上来的数据一起归并写入新的分表,新的分表号和旧的分表号最末二进制位为0/1交替,即原目标分表号末位是0,则新分表号为1,反之为0,前面的所有位的值和原目标分表号一致。
配置文件说明
配置文件ubc.json
该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:
[{ "addOnly":true, "sortKey":"account,tdate", "timeField":"tdate", "deleteField":null, "otherFields":"ColName1,ColName2", "dataDir":"data/", "dataFilename":"data.ctx", "interval":"d", "currentZone":0, "backupZone":1, "discardZone":[] }] |
"addOnly": 是否仅追加,取值true/false。
"sortKey": 如为追加模式,则为排序字段名,一般要包含时间字段。如为更新模式,则为主键字段名,同时要求数据按主键字段排好序。多个英文逗号分隔。
"timeField":时间字段名,如果为追加模式,必须配置此字段,更新模式则置为null。
"deleteField": 删除标识字段,如果更新模式,需要删除则配上此字段,不需要删除就配置null
"otherFields":其它字段名,英文逗号分隔。
"dataDir":组表存储路径,相对于主目录。
"dataFilename":组表文件名,如"data.ctx"。
"interval":分表的时间区间,m表示1月,d表示1日。
"currentZone": 正在使用的主分表号,如0。如为追加模式,则不用配置。
"backupZone": 备份的主分表号,如1。如为追加模式,则不用配置。
"discardZone":被弃用的分表号列表,内容由代码自动产生,初始配置为[]。
存储结构
主目录下的文件及子目录:
data:组表存储路径,目录名在ubc.json中配置,参见前面介绍
ubc.json:配置文件
data目录下的文件如下所示:
文件名由"分表号.组表文件名.ctx"组成,其中组表文件名在ubc.json中配置,参见前面介绍,分表号由系统根据层级别自动计算,分表号的计算规则参见第2节介绍。
配置及存储举例
电商系统:
用户数在50-100万规模,每天记录数在50万-100万行。
这个例子数据量较小,时间区间设置为1月。
其配置文件如下:
[{ "sortKey":"uid,etime",
"timeField":"etime",
"otherFields":"eventID,eventType",
"dataDir":"data/",
"dataFilename":"data.ctx",
"interval":"m",
"currentZone":0,
"backupZone":1,
"addOnly":true,
"discardZone":[],
}]
全局变量
zones: 分表号列表。
current: 当前主分表号,更新模式才有此变量。
代码解析
init.splx
仅在服务器启动时执行一次,过程中不再执行,如果服务器是第一次启动,则需要初始化参数。
A | B | C | |
1 | >config=json(file("ubc.json").read()) | ||
2 | if(!config.addOnly) | ||
3 | =file(config.dataDir/config.dataFilename:config.currentZone) | ||
4 | if(!B3.exists()) | ||
5 | =config.sortKey.split@c().("#"+trim(~)).concat@c() | ||
6 | =if(config.deleteField, B3.create@d(${C5},${config.deleteField},${config.otherFields}), B3.create(${C5},${config.otherFields}) ) | ||
7 | =C6.close() | ||
8 | >env(current,config.currentZone) | ||
9 | =file("zone.btx") | ||
10 | if(A9.exists()) | =A9.import@bi() | |
11 | else | >B10=[] | |
12 | =env(zones,B10) |
A1 读分表号存储文件
A2 如果是更新模式
B4 如果当前主分表不存在
C5-C7 构造一个空的主分表
B8 将当前主分表号设置为全局变量
A9-B11 如果分表号文件存在,则读出,否则构造一个空序列
A12 将分表号序列设为全局变量zones
write.splx
被外部程序调用执行,传入游标数据,先将数据写出到临时分表,再归并入正式分表。如为追加模式,要求传入的游标数据按时间字段排好序;如为更新模式,则传入的游标数据按主键字段排好序。
A | B | C | D | |
1 | >config=json(file("ubc.json").read()) | |||
2 | =config.discardZone.run(movefile(config.dataDir/~/"."/config.dataFilename)) | |||
3 | >config.discardZone=[] | |||
4 | =[] | |||
5 | =config.sortKey.split@c().("#"+trim(~)).concat@c() | |||
6 | if(!config.addOnly) | |||
7 | =(year(now())%100)*10000+month(now())*100+day(now()) | |||
8 | =file(config.dataDir/config.dataFilename:B7) | |||
9 | =if(config.deleteField, B8.create@d(${A5},${config.deleteField},${config.otherFields}), B8.create(${A5},${config.otherFields}) ) | |||
10 | =B9.append(cs) | |||
11 | =B9.close() | |||
12 | >A4.insert(0,B7) | |||
13 | else | for | ||
14 | =cs.fetch@0(1).${config.timeField} | |||
15 | if(!C14) | break B13 | ||
16 | =func(A40,C14) | |||
17 | =(year(C14)%100)*10000+month(C14)*100+day(C14) | |||
18 | >data=cs.fetch(;${config.timeField}>=C16) | |||
19 | if(!data) | break B13 | ||
20 | =file(config.dataDir/config.dataFilename:[C17]) | |||
21 | >data=data.sort(${config.sortKey}) | |||
22 | =C20.create(${A5},${config.otherFields}) | |||
23 | =C22.append@i(data.cursor()) | |||
24 | =C22.close() | |||
25 | >A4.insert(0,C17) | |||
26 | =A4.group(if(config.interval=="m",(~\100)*1000,~*10):zu;~:zd) | |||
27 | =[] | =[] | ||
28 | for A26 | |||
29 | =zones.select@1(xor(~,A28.zu)<=1) | |||
30 | =if(B29,xor(B29,1),A28.zu) | |||
31 | =file(config.dataDir/config.dataFilename:(B29|A28.zd)) | |||
32 | =file(config.dataDir/config.dataFilename:B30) | |||
33 | =B31.reset${if(config.addOnly,"","@w")}(B32) | |||
34 | >A28.zd.run(movefile(config.dataDir/~/"."/config.dataFilename)) | |||
35 | >config.discardZone.insert(0,B29) | |||
36 | >A27=A27|B29,B27=B27|B30 | |||
37 | =file("ubc.json").write(json(config)) | |||
38 | >zones=((zones\A27)|B27).sort() | |||
39 | =file("zone.btx").export@b(zones) | |||
40 | func | |||
41 | =[year(A40),month(A40)-1,day(A40)-1,hour(A40),minute(A40),second(A40)] | |||
42 | =if(config.interval=="m",2,3) | |||
43 | =B41(B42)=B41(B42)+1 | |||
44 | >B41.run(if(#>B42,~=0) ) | |||
45 | =datetime(B41(1),B41(2)+1,B41(3)+1,B41(4),B41(5),B41(6)) | |||
46 | return B45 |
A1-A3 读配置文件,删除弃用分表
A4 用来存储临时分表号
A6 如果是更新模式
B7 用当前时刻算出临时分表号
B8-B11 将游标数据写入临时分表
B12 记录临时分表
A13 如果是追加模式
B13 循环
C14 获得第一条记录的时间
C15 如果读不到数据则结束循环
C16 根据第一条记录的时间算出其所属的分表的结束时间
C17 用第一条记录的时间算出临时分表号
C18 从游标中选出小于C16的记录存入data变量中
C20-C24 将data按sortKey排序后写入以C17为分表号的临时分表中
C25 将C17暂存到A4序列中
A26 将临时分表号按其所属的正式分表号分组
A28 按组循环
B29 从正式分表号列表中读出当前组所属的分表号
B30 如果B29存在,则使用交替尾号作为新的分表号
B31-B33 将临时分表和原目标分表(如果存在)归并后写入新的交替分表号中
B34 删除临时分表
B35 原分表号B29(如果存在)写入弃用分表号列表
B36 记录原分表号和新分表号
A38 从分表号列表中删除原分表号,增加新分表号
A40 计算传入时间所属的分表区间的结束时间
manualMerge
手动调用此程序,将分表归并入主分表号,仅适用于更新模式
输入参数:
t 将此时刻之前产生的分表归并入主分表
A | B | |
1 | >config=json(file("ubc.json").read()) | |
2 | if t!=null | |
3 | =(year(t)%100)*100000+month(t)*1000+day(t)*10 | |
4 | =zones.select(~<B3) | |
5 | else | >B4=zones |
6 | =file(config.dataDir/config.dataFilename:(config.currentZone|B4)) | |
7 | =file(config.dataDir/config.dataFilename:config.backupZone) | |
8 | =A6.reset@w(A7) | |
9 | >config=json(file("ubc.json").read()) | |
10 | >config.discardZone.insert(0,(config.currentZone|B4)) | |
11 | >config.currentZone=config.backupZone,config.backupZone=current | |
12 | >current=config.currentZone | |
13 | =file("ubc.json").write(json(config)) | |
14 | >zones=zones\B4 | |
15 | =file("zone.btx").export@b(zones) |
A2-B5 如果t不为空,则选出t时刻之前完成的分表号,否则选出全部分表号
A6-A8 把当前主分表和选出的分表归并后写入备份主分表
A10 把当前主分表号和选出的分表号写入弃用列表
A11-A12 将当前主分表号和备份分表号对调
A14-A15 更新分表号列表,并把之备份至文件
query.splx
查询数据时使用,返回复组表分表号
追加模式的输入参数:
start:起始时间
end:结束时间
更新模式无输入参数
A | B | |
1 | if ifv(current) | return current|zones |
2 | >sz=((year(start)%100)*10000+month(start)*100+day(start))*10+1 | |
3 | >ez=((year(end)%100)*10000+month(end)*100+day(end))*10+1 | |
4 | >sp = ifn(zones.pselect@z( ~<=sz), 1 ) | |
5 | >ep = ifn(zones.pselect( ~>ez), zones.len()+1) | |
6 | =if (sp >= ep,null,zones.to( sp, ep-1)) | |
7 | return A6 |
A1 如果存在current,说明是更新模式,则返回所有分表号
A2-A3 算出输入参数start\end所属的分表号,考虑到交替位的问题,末位置1可以方便写出查找代码
A4 从后往前找出第一个早于等于起始时间的分表号位置
A5 从前往后找出第一个晚于结束时间的分表号位置
A6 选出位于起止时间之间的分表号,左闭右开;没有则返回null
说明:
1、 使用复组表后必须关闭,否则归并操作后无法删除分表
2、 如为更新模式,则使用复组表产生游标时,必须使用@w选项,即cursor@w(),表示分表之间采用更新机制归并
英文版