定期小量主动更新例程
组表是SPL重要的文件存储格式。组表文件不支持在读取时同时写入,为保证高性能,组表还常常要求数据有序存储。数据不是一成不变的,还需要持续追加或更新,新产生数据的次序通常和组表要求的次序也不同。这个时候,如何在维护组表数据时不影响正在进行的查询以及保持数据的有序就是个问题。本例程集示例在不同场景下对组表进行数据维护时如何解决该问题。
应用场景的一些基本特征:
实时:可能在任何时刻进行数据维护。
定期:只会在特定时刻维护数据(通常定期进行),两次维护数据的时间相差较长,足够进行一些数据整理工作。
单组表:总数据量相对小,一个组表文件就能存储了。
复组表:总数据量很大,通常会存入多个分表,形成复组表。
追加:组表没有主键,数据只会追加,不会修改和删除。
更新:组表有主键,数据可能增加、修改、删除。
小量维护:新产生的数据量较小,可以装入内存,一般以序表形式传入。
大量维护:新产生的数据量可能很大,以游标形式传入。
在线:查询可以在数据维护的同时进行。
离线:数据维护时可中断查询,离线是在线的特殊情况,以下例程均按在线要求编写。
例程概述
1、 定期小量主动更新例程
利用组表的补区功能定期维护小量数据,仅对单组表,支持更新,主动到数据源获得更新的数据。
2、 单组表定期维护例程
定期维护大量数据,提供追加和更新两种模式,新产生数据由应用传入。
3、 复组表定期维护例程
定期维护大量数据,提供追加和更新两种模式,新产生数据由应用传入。
4、 实时追加例程
实时追加小量数据,必须用复组表,新产生数据由应用传入。
5、 实时更新例程
实时更新小量数据,必须用复组表,新产生数据由应用传入。
定期小量主动更新例程
背景与方法
有主键的组表可以用补区来实现小量数据更新,但在写入补区时不能同时查询;多次更新后会导致补区太大从而影响查询性能,定期做重整将补区融合进组表时也要中断查询。
本例程示例如何实现组表更新时兼顾查询,适用于增删改量较小的场景。
方法:将当前正在使用的组表复制一遍,然后在其上实施更新动作,原组表继续用于查询,更新完成后,启用新组表用于查询,原组表在下次更新时删除。
查询时,除了读查询组表,还同时读最后一次更新时刻之后的新的更新数据,用update@y实现内存中补区更新,不写入组表文件。这样可以实现实时热数据参与计算。
约定与概念
1. 当前表:正在用于查询的组表,文件名记在config中。
2. 备份表:复制出来用于实施更新的组表,文件名记在config中。完成更新后,将config中的当前表和备份表的文件名互换,之后的查询将基于新的当前表(即之前的备份表)进行。新的备份表(之前的当前表)将会在下次更新前删除。
3. read.splx: 此脚本由应用实现,输入参数为起始时间,结束时间,均为long值。需返回在此时间段内的更新数据,返回结果由两个序表组成的序列,第一个序表为更新序表,即值有修改和新增的记录,第二个序表为删除序表,即需要删除的记录
配置文件说明
配置文件ubc.json
该文件缺省位于集算器的主目录下(如想存到别的目录,请自行修改代码为绝对路径),内容如下:
[{ "updateTimes":3, "resetTimes":5, "lastUpdateTime":null, "sortKey":"account,tdate", "otherFields":"ColName1,ColName2", "dataDir":"data/", "current":"data2.ctx", "backup":"data1.ctx" }] |
"updateTimes": 从上一次重整后,到现在的累计更新次数,服务器初次启动时填0。
"resetTimes": 重整次数,当累计更新次数达到重整次数时,对组表执行一次重整,累计更新次数清零。
"lastUpdateTime": 上一次的更新时刻,表示此时刻之前的数据已更新完毕,下次更新只读此时刻之后的数据,服务器初次启动填null,表示所有数据都要更新。
"sortKey": 组表的主键字段,也是排序字段,也称为维字段,多个用英文逗号分隔。
"otherFields": 组表的其它字段,多个用英文逗号分隔。
"dataDir": 组表存储路径,相对于主目录。
"current": 正在使用的组表文件名,如"data1.ctx"。
"backup": 备份的组表文件名,如"data2.ctx"。
配置举例
某电商系统的店铺表,假定每小时更新一次,每天做一次重整,即每24次更新重整一次,其配置文件如下:
[{
"updateTimes":0,
"resetTimes":24,
"lastUpdateTime":null,
"sortKey":"id",
"otherFields":"ShopID,EmpID,ProductID,Ttime,Quantity,Price,Amount,Discount",
"dataDir":"data/",
"current":"data2.ctx",
"backup":"data1.ctx"
}]
存储结构
主目录下的文件及子目录:
data:组表存储路径,目录名在ubc.json中配置,参见前面介绍
ubc.json:配置文件
data目录下的文件如下所示:
组表文件名在ubc.json中配置,参见前面介绍。
全局变量
current: 当前组表文件名
lastUpdateTime: 上次更新时间
代码解析
init.splx
服务器启动时执行,如果服务器是第一次启动,则需要初始化参数,创建初始组表。
A |
B |
|
1 |
>config=json(file("ubc.json").read()) |
|
2 |
=file(config.dataDir/config.current) |
|
3 |
if(!A2.exists()) |
|
4 |
=A2.create(${config.sortKey.split@c().("#"+trim(~)).concat@c()},${config.otherFields}) |
|
5 |
=B4.close() |
|
6 |
=movefile@cy(config.dataDir/config.current,config.backup) |
|
7 |
=file(config.dataDir/config.backup) |
|
8 |
if(!A7.exists()) |
|
9 |
=movefile@cy(config.dataDir/config.current,config.backup) |
|
10 |
>env(current,config.dataDir/config.current) |
|
11 |
>env(lastUpdateTime,config.lastUpdateTime) |
A1 读配置文件
A3-B6 如果当前组表文件不存在,则创建一个新的,并复制成备份组表文件
A8-B9 如果备份组表文件不存在,则用当前组表复制一个
A10 将当前组表文件名设为全局变量
A11 将上次更新时间设为全局变量
read.splx
这里是示例代码,实际由应用实现。输入参数为起始时间,结束时间,均为long值。需返回在此时间段内更新的数据,返回结果由两个序表组成的序列,第一个序表为更新序表,即值有修改的记录或者新插入的记录,第二个序表为删除序表,即需要删除的记录。序表的字段顺序为【主键字段+其它字段】,和config中配置的一致
A |
|
1 |
=connect("mysql") |
2 |
=A1.query("select account,tdate,ColName1,ColName2 from tbl1 where t1>=? and t1<? and deleted=0",start,end) |
3 |
=A1.query("select account,tdate,ColName1,ColName2 from tbl1 where t1>=? and t1<? and deleted=1",start,end) |
4 |
=A1.close() |
5 |
return [A2,A3] |
A2-A3 例子中t1为更新时间戳字段,deleted为删除标记字段,0表示正常,1表示该记录已被删除
update.splx
被外部程序调用执行,读取上次更新时刻到当前时刻之间的更新数据,更新到备份组表,更新结束后把备份组表设为当前组表,同时弃用原当前组表,下次更新前删除。
累计更新次数,当累计更新次数达到重整次数时,重整组表,并把累计次数清零。
A |
B |
|
1 |
>config=json(file("ubc.json").read()) |
|
2 |
=movefile@cy(config.dataDir/config.current,config.backup) |
|
3 |
=long(datetime@h(now())) |
|
4 |
=call("read1.splx",lastUpdateTime,A3) |
|
5 |
if(A4(1).len()==0 && A4(2).len()==0) |
|
6 |
return |
|
7 |
=file(config.dataDir/config.backup).open() |
|
8 |
=A7.update(A4(1):A4(2)) |
|
9 |
=A7.close() |
|
10 |
>config.updateTimes+=1 |
|
11 |
if(config.updateTimes>=config.resetTimes) |
|
12 |
=file(config.dataDir/config.backup).reset() |
|
13 |
>config.updateTimes=0 |
|
14 |
=config.current |
|
15 |
>config.current=config.backup,current=config.dataDir/config.current |
|
16 |
>config.backup=A14 |
|
17 |
>config.lastUpdateTime=A3,lastUpdateTime=A3 |
|
18 |
=file("ubc.json").write(json(config)) |
A1 读配置文件
A2 将当前组表复制成备份组表
A3 此次的更新时刻,精确到小时
A4 读上次更新时刻到此次更新时刻之间的数据
A5 如果无更新,则跳过
A7 打开备份组表
A8 执行更新
A10 累计更新次数加1
A11 如果累计更新次数达到重整次数
B12 整理备份组表
B13 累计更新次数清零
A14-A16 将备份组表和正式组表文件名对调
A17 将上次更新时间置为此次更新时间
query.splx
查询数据时使用,读取上次更新时刻和当前时刻之间的实时数据,放在内存补区中,返回组表对象
A |
B |
|
1 |
=call("read.splx",lastUpdateTime,long(now())) |
|
2 |
=file(current).open() |
|
3 |
if(A1(1).len()>0 || A1(2).len()>0) |
=A2.update@y(A1(1):A1(2)) |
4 |
return A2 |
A1 读更新数据
A2 打开组表文件
A3-B3 将更新数据写入内存补区,即可参与计算
A4 返回组表对象
说明:使用组表对象后必须关闭
英文版