SPL 高性能时序数据读写例程
总体说明
本文档记录基于 esProc SPL 的一套高性能时序数据读写方案,包括:
· 配置文件说明
· 数据写入(内存缓存与持久化策略)
· 多层级文件归并(自动分层 + 自动清理)
· 数据读取(实时 + 历史分层自动查询)
适用于需要高吞吐、长时间序数据管理的场景,例如设备监测点位、IoT 数据、财务盘口数据等。
配置文件
config.json
示例
{
"fields": [
{"user_fieldname": "oid", "sys_fieldname": "id"},
{"user_fieldname": "odt", "sys_fieldname": "dt"},
{"user_fieldname": "val", "sys_fieldname": "val"},
{"user_fieldname": "otype", "sys_fieldname": "type"},
{"user_fieldname": "quality", "sys_fieldname": "quality"}
],
"timezone": [5,600,3600,21600,86400],
"maxcalctime": 3
}
字段说明
字段 |
说明 |
fields |
用户系统字段名 → SPL 存储字段名映射 |
timezone |
多层级时间间隔(秒),决定缓存与分层归并时间长度 |
maxcalctime |
最大计算时间,用于判断文件是否需要删除 |
timezone 示例解释
[5, 600, 3600, 21600, 86400]
含义:
层1(缓存):5 秒写一次缓存层数据
层2(l1):600 秒(10分钟)生成一层
层3(l2):3600 秒(一小时)生成一层
层4(l3):21600 秒(6 小时)生成一层
层5(l4):86400 秒(1 天)生成一层
初始化脚本
init.splx
A |
B |
|
1 |
=json(file("config.json").read()) |
/读配置文件 |
2 |
=env(rname,A1.fields.array().m(2:).(~.concat(":")).concat@c()) |
/按配置信息拼出rname(用户系统中的字段与SPL存储字段的对应关系,用户系统字段:SPL中的字段) |
3 |
=env(cfileds,A1.fields.(sys_fieldname).concat@c()) |
/按配置信息拼出,设为全局变量cfileds(创建ctx时使用) |
4 |
=env(tz,A1.timezone) |
/按配置信息读取分层时间,设为全局变量tz |
5 |
=env(mct,A1.maxcalctime) |
/按配置信息读取最大计算时间,设为全局变量mct |
6 |
=env(mem_data,create(${cfileds})) |
/内存缓存,设为全局变量mem_data |
7 |
=if(!file("qfl.btx").exists(),file("qfl.btx").export@bi(tz.len().([]))) |
/待查询文件列表(持久化)【根据tz层数确定】 |
8 |
=if(!file("dfl.btx").exists(),file("dfl.btx").export@bi((tz.len()-1).(null))) |
/待删除文件列表(持久化)【根据tz层数确定】 |
9 |
=if(!file("tags.btx").exists(),env(tags,create(tagid)),env(tags,file("tags.btx").import@b().keys(#1))) |
/位号列表(持久化) |
10 |
=call@n("write.splx") |
/写缓存与第一层btx的处理 |
11 |
=call@n("merge.splx") |
/高层组表归并 |
主要功能
1. 加载配置
2. 建立全局变量
3. 创建持久化文件:
· qfl.btx(可查询文件列表)
· dfl.btx(待删除文件列表)
· tags.btx(点位 ID 持久化)
4. 调用写入与归并脚本
初始化步骤概述
步骤 |
功能 |
读取 config.json |
得到字段映射、分层时间、最大计算时间 |
env(rname) |
构造字段映射字符串,如 "oid:id" |
env(cfields) |
生成 SPL 存储字段列表,用于创建 ctx |
初始化 mem_data |
内存缓存:create(id,dt,val,type,quality) |
初始化 qfl.btx |
每层的可查询文件名列表 |
初始化 dfl.btx |
每层待删除的文件名记录 |
初始化 tags.btx |
点位(tagid)列表 |
调用 write.splx |
启动持续写入 |
调用 merge.splx |
启动后台层级归并 |
数据写入脚本
write.splx
A |
B |
|
1 |
>output("write_start...") |
|
2 |
=now() |
|
3 |
=long(elapse@s(A2,-tz(1))) |
/当前时间的前tz(1)时间点(比当前小) |
4 |
=long(elapse@s(A2,-mct)) |
/当前时间的前mct时间点(比当前小) |
5 |
=lock("dfl") |
|
6 |
=file("dfl.btx").import@bi() |
/读出待删除文件列表【合并后生产】 |
7 |
=A6.pselect@a(~ && A4>=~(1)) |
/找出每层待删除的文件,并且待删除时间点要小于A4 |
8 |
=A6(A7).conj(~(2)).(movefile(~)) |
/删文件 |
9 |
>A6(A7)=null |
/把已经删了的文件,对应的列表清空 |
10 |
=file("dfl.btx").export@b(A6) |
/删除后剩余列表,写回文件 |
11 |
=lock@u("dfl") |
|
12 |
=call("data_gen_5.splx",now(),tz(1)).new(${rname}) |
/模拟接口,按当前时间,tz(1)的频率拿前tz(1)时间段的数据,转为对应SPL的字段 |
13 |
=tags.len() |
|
14 |
=[A12.groups(id).(#1),tags.(#1)].merge@od().new(~:tagid,A13+#:id) |
/新增位号列表 |
15 |
=lock("tags") |
|
16 |
=if(A14.len()==0,,(tags=(tags|A14).derive().keys(tagid),file("tags.btx").export@ab(A14))) |
/追加位号,并持久化 |
17 |
=lock@u("tags") |
|
18 |
=A12.run(#1=int(tags.select@b(tagid==#1).id)).sort(#1,#2) |
/位号值转为序号存储(位号,时间是固定位置) |
19 |
=tmp=mem_data.select(id==1) |
/这里认为点位1永远存在,情况有变再调整下 |
20 |
=A12.max(#2) |
|
21 |
if between@lr(A19.max(dt)-A19.min(dt),tz(2)-tz(1)*2:tz(2)) |
/当内存缓存时间段达到tz(2)的时间长度,持续追加的缓存写到btx,内存缓存开始新一轮追加,更新待查询列表 |
22 |
=file("l1"/A20/".btx").export@b(mem_data) |
|
23 |
=mem_data=A18 |
|
24 |
=file("c"/A20/".btx").export@b(mem_data) |
|
25 |
=lock("qfl") |
|
26 |
=file("qfl.btx").import@bi() |
|
27 |
=1*B26 |
|
28 |
=B27(2)=B27(2)|[A20] |
|
29 |
=B27(1)=[A20] |
|
30 |
=file("qfl.btx").export@b(B27) |
|
31 |
=lock@u("qfl") |
|
32 |
else |
/追加内存缓存(按位点、时间归并),写出缓存btx,更新带查询列表 |
33 |
=mem_data=[mem_data,A18].merge(id,dt) |
|
34 |
=file("c"/A20/".btx").export@b(mem_data) |
|
35 |
=lock("qfl") |
|
36 |
=file("qfl.btx").import@bi() |
|
37 |
=1*B36 |
|
38 |
=B37(1)=[A20] |
|
39 |
=file("qfl.btx").export@b(B37) |
|
40 |
=lock@u("qfl") |
|
41 |
=directory("c*.btx") |
/找出缓存btx,并清理。调整A42控制清理缓存btx的范围 |
42 |
if A41.len()>2 |
=A41.sort().m(:-3).(movefile(~)) |
43 |
>output("w:"/(tz(1)*1000-interval@ms(A2,now()))) |
|
44 |
=sleep(tz(1)*1000-interval@ms(A2,now())) |
|
45 |
goto A1 |
主要功能
1. 模拟数据接口读取
2. 写入内存缓存
3. 缓存达到一定长度后导出为层2(l1)的btx 文件
4. 更新qfl.btx
5. 清理过期层1(缓存)btx
6. 写入/追加 tags
写入流程
清理 dfl 中需要删除的文件
所有层都会产生过期文件,它们由 dfl.btx 维护。
· 加锁
· 遍历每个层级
· 删除对应文件
· 更新表项并写回
· 解锁
获取最新模拟数据
call("data_gen.splx", now(), tz(1))
按 tz(1)=5s 生成最近一批数据。
更新 tag 列表(点位 ID)
· 将新产生的点位加入 tags
· 持久化到 tags.btx
写入缓存 / 写入新文件
判断 mem_data 的时间跨度是否达到下一层级限制:
达到阈值
导出 l1 文件
mem_data = 新数据
更新 qfl.btx
未达到阈值
mem_data = mem_data ∪ 新数据(按 id/dt merge)
导出缓存文件
更新 qfl.btx
清理缓存 btx
防止缓存文件过多
分层归并脚本
merge.splx
A |
B |
|
1 |
>output("m 1st time sleep(s):"/tz(3)) |
|
2 |
=sleep(tz(3)*1000) |
|
3 |
>output("merge_start...") |
|
4 |
=now() |
|
5 |
=file("qfl.btx").import@bi() |
/读查询文件列表 |
6 |
=A5.m(2:-2).(lfn=~.m(-1),if(#==1 && ~!=[],(btxs=~.("l1"/~/".btx"),mc(btxs.(file(~).cursor@b()).merge(#1,#2),btxs,#,lfn)),if(#>1 && ~.len()>=tz(2+#)\tz(1+#),(lnum=#,ctxs=~.("l"/lnum/~/".ctx"),mc(ctxs.(file(~).open().cursor()).merge(#1,#2),ctxs,#,lfn))))) |
/对非缓存层,归并到组表 |
7 |
func mc(cs,fl,lv,mfn) |
|
8 |
=file("l"/(1+lv)/mfn/".ctx").create(${cfileds}) |
|
9 |
=B8.append@i(cs) |
|
10 |
>B8.close() |
|
11 |
=lock("qfl") |
|
12 |
=file("qfl.btx").import@bi() |
|
13 |
=B12(lv+2)|=mfn |
|
14 |
=B12(lv+1).delete(fl.len().()) |
|
15 |
=file("qfl.btx").export@b(B12) |
|
16 |
=lock@u("qfl") |
|
17 |
=lock("dfl") |
|
18 |
=file("dfl.btx").import@bi() |
|
19 |
>B18(lv)=[long(now()),fl] |
|
20 |
=file("dfl.btx").export@b(B18) |
|
21 |
=lock@u("dfl") |
|
22 |
>output("m:"/(tz(3)*1000-interval@ms(A4,now()))) |
|
23 |
=sleep(tz(3)*1000-interval@ms(A4,now())) |
|
24 |
goto A3 |
主要功能
将下层文件自动向上层归并,当数量达到指定阈值,就生成更高层级的 ctx 文件。
归并规则
例如:
· l1(10分钟)如果有 3600/600 = 6 个文件
→ 合并到 l2(1小时)
· l2(1小时)有 6 个文件
→ 合并到 l3(6小时)
以此类推。
归并子程序
func mc(cs,fl,lv,mfn)
创建新 ctx 文件
将多个源文件 cursor 合并
更新 qfl(删除老文件,加入新文件)
在 dfl 中记录需要删除的老文件
数据读取逻辑
read.splx
A |
B |
C |
|
1 |
=now() |
||
2 |
=rand@s(1) |
||
3 |
=n |
||
4 |
=et=long(now())\1000 |
/当前时间(long秒) |
|
5 |
=st=et-t-5 |
||
6 |
=file("qfl.btx").import@bi() |
||
7 |
=A6.(~.select(~>=st)) |
||
8 |
=A6.(~.select(~<=et+86400)) |
||
9 |
=A7.(~^A8(#)) |
||
10 |
=A9.(if(#==1 && ~,~.("c"/~/".btx"),if(#==2 && ~,~.("l1"/~/".btx"),if(#==3 && ~!=[null,null],~.("l2"/~/".ctx"),if(#==4 && ~!=[null,null],~.("l3"/~/".ctx"),if(~!=[null,null],~.("l4"/~/".ctx"))))))) |
||
11 |
=ctxFiles=A10.m(3:).conj() |
||
12 |
>ctxResult=null |
||
13 |
=now() |
||
14 |
=now() |
||
15 |
if ctxFiles.len()>0 |
fork ctxFiles |
=file(B15).open().cursor@mx(;id==n && dt>=st && dt<et).fetch() |
16 |
=B15.conj() |
||
17 |
|||
18 |
=interval@ms(A14,now()) |
||
19 |
=now() |
||
20 |
if A10(1).len()>0 |
fork A10(1)|A10(2) |
|
21 |
=file(B20).iselect@rb(A3,id).fetch().select(dt>=st && dt<et) |
||
22 |
=B20.conj() |
||
23 |
=interval@ms(A19,now()) |
||
24 |
=B16|B22 |
>output("数据量:"/A24.len()) |
|
25 |
=interval@ms(A13,now()) |
>output("耗时(ms):"/A25) |
1. 按参数n(点位)、t(时间段)确定查询点位、时间范围
2. 读取 qfl.btx 找到可能覆盖时间范围的文件
· 分两部分执行查询
· 查询 ctx(高层组表,历史文件)
· 使用多线程:
· cursor@mx(; id==n && dt>=st && dt<et)
· 查询 btx(最近数据 + 缓存)
· 对 btx 执行:
· iselect@rb(A3,id).fetch().select(...)
3. 合并两部分结果
文件结构
以示例配置时间区间为例
project/
├── config.json
├── init.splx
├── write.splx
├── merge.splx
├── data_gen.splx
├── c${timestamp}.btx ←缓存 btx
├── l1${timestamp}.btx ←10分钟层文件
├── l2${timestamp}.ctx ←1小时层
├── l3${timestamp}.ctx ←6小时层
├── l4${timestamp}.ctx ←1天层
├── tags.btx
├── qfl.btx
└── dfl.btx
