SPL 高性能时序数据读写例程

总体说明

本文档记录基于 esProc SPL 的一套高性能时序数据读写方案,包括:

· 配置文件说明

· 数据写入(内存缓存与持久化策略)

· 多层级文件归并(自动分层 + 自动清理)

· 数据读取(实时 + 历史分层自动查询)

适用于需要高吞吐、长时间序数据管理的场景,例如设备监测点位、IoT 数据、财务盘口数据等。

配置文件

config.json

示例

{

"fields": [

{"user_fieldname": "oid", "sys_fieldname": "id"},

{"user_fieldname": "odt", "sys_fieldname": "dt"},

{"user_fieldname": "val", "sys_fieldname": "val"},

{"user_fieldname": "otype", "sys_fieldname": "type"},

{"user_fieldname": "quality", "sys_fieldname": "quality"}

],

"timezone": [5,600,3600,21600,86400],

"maxcalctime": 3

}

字段说明

字段

说明

fields

用户系统字段名 → SPL 存储字段名映射

timezone

多层级时间间隔(秒),决定缓存与分层归并时间长度

maxcalctime

最大计算时间,用于判断文件是否需要删除

timezone 示例解释

[5, 600, 3600, 21600, 86400]

含义:

  • 1(缓存)5 秒写一次缓存层数据

  • 2l1600 秒(10分钟)生成一层

  • 3l23600 秒(一小时)生成一层

  • 4l321600 秒(6 小时)生成一层

  • 5l486400 秒(1 天)生成一层

初始化脚本

init.splx


A

B

1

=json(file("config.json").read())

/读配置文件

2

=env(rname,A1.fields.array().m(2:).(~.concat(":")).concat@c())

/按配置信息拼出rname(用户系统中的字段与SPL存储字段的对应关系,用户系统字段:SPL中的字段)

3

=env(cfileds,A1.fields.(sys_fieldname).concat@c())

/按配置信息拼出,设为全局变量cfileds(创建ctx时使用)

4

=env(tz,A1.timezone)

/按配置信息读取分层时间,设为全局变量tz

5

=env(mct,A1.maxcalctime)

/按配置信息读取最大计算时间,设为全局变量mct

6

=env(mem_data,create(${cfileds}))

/内存缓存,设为全局变量mem_data

7

=if(!file("qfl.btx").exists(),file("qfl.btx").export@bi(tz.len().([])))

/待查询文件列表(持久化)【根据tz层数确定】

8

=if(!file("dfl.btx").exists(),file("dfl.btx").export@bi((tz.len()-1).(null)))

/待删除文件列表(持久化)【根据tz层数确定】

9

=if(!file("tags.btx").exists(),env(tags,create(tagid)),env(tags,file("tags.btx").import@b().keys(#1)))

/位号列表(持久化)

10

=call@n("write.splx")

/写缓存与第一层btx的处理

11

=call@n("merge.splx")

/高层组表归并

主要功能

1. 加载配置

2. 建立全局变量

3. 创建持久化文件:

· qfl.btx(可查询文件列表)

· dfl.btx(待删除文件列表)

· tags.btx(点位 ID 持久化)

4. 调用写入与归并脚本

初始化步骤概述

步骤

功能

读取 config.json

得到字段映射、分层时间、最大计算时间

env(rname)

构造字段映射字符串,如 "oid:id"

env(cfields)

生成 SPL 存储字段列表,用于创建 ctx

初始化 mem_data

内存缓存:create(id,dt,val,type,quality)

初始化 qfl.btx

每层的可查询文件名列表

初始化 dfl.btx

每层待删除的文件名记录

初始化 tags.btx

点位(tagid)列表

调用 write.splx

启动持续写入

调用 merge.splx

启动后台层级归并

数据写入脚本

write.splx


A

B

1

>output("write_start...")


2

=now()


3

=long(elapse@s(A2,-tz(1)))

/当前时间的前tz(1)时间点(比当前小)

4

=long(elapse@s(A2,-mct))

/当前时间的前mct时间点(比当前小)

5

=lock("dfl")


6

=file("dfl.btx").import@bi()

/读出待删除文件列表【合并后生产】

7

=A6.pselect@a(~ && A4>=~(1))

/找出每层待删除的文件,并且待删除时间点要小于A4

8

=A6(A7).conj(~(2)).(movefile(~))

/删文件

9

>A6(A7)=null

/把已经删了的文件,对应的列表清空

10

=file("dfl.btx").export@b(A6)

/删除后剩余列表,写回文件

11

=lock@u("dfl")


12

=call("data_gen_5.splx",now(),tz(1)).new(${rname})

/模拟接口,按当前时间,tz(1)的频率拿前tz(1)时间段的数据,转为对应SPL的字段

13

=tags.len()


14

=[A12.groups(id).(#1),tags.(#1)].merge@od().new(~:tagid,A13+#:id)

/新增位号列表

15

=lock("tags")


16

=if(A14.len()==0,,(tags=(tags|A14).derive().keys(tagid),file("tags.btx").export@ab(A14)))

/追加位号,并持久化

17

=lock@u("tags")


18

=A12.run(#1=int(tags.select@b(tagid==#1).id)).sort(#1,#2)

/位号值转为序号存储(位号,时间是固定位置)

19

=tmp=mem_data.select(id==1)

/这里认为点位1永远存在,情况有变再调整下

20

=A12.max(#2)


21

if between@lr(A19.max(dt)-A19.min(dt),tz(2)-tz(1)*2:tz(2))

/当内存缓存时间段达到tz(2)的时间长度,持续追加的缓存写到btx,内存缓存开始新一轮追加,更新待查询列表

22


=file("l1"/A20/".btx").export@b(mem_data)

23


=mem_data=A18

24


=file("c"/A20/".btx").export@b(mem_data)

25


=lock("qfl")

26


=file("qfl.btx").import@bi()

27


=1*B26

28


=B27(2)=B27(2)|[A20]

29


=B27(1)=[A20]

30


=file("qfl.btx").export@b(B27)

31


=lock@u("qfl")

32

else

/追加内存缓存(按位点、时间归并),写出缓存btx,更新带查询列表

33


=mem_data=[mem_data,A18].merge(id,dt)

34


=file("c"/A20/".btx").export@b(mem_data)

35


=lock("qfl")

36


=file("qfl.btx").import@bi()

37


=1*B36

38


=B37(1)=[A20]

39


=file("qfl.btx").export@b(B37)

40


=lock@u("qfl")

41

=directory("c*.btx")

/找出缓存btx,并清理。调整A42控制清理缓存btx的范围

42

if A41.len()>2

=A41.sort().m(:-3).(movefile(~))

43

>output("w:"/(tz(1)*1000-interval@ms(A2,now())))


44

=sleep(tz(1)*1000-interval@ms(A2,now()))


45

goto A1


主要功能

1. 模拟数据接口读取

2. 写入内存缓存

3. 缓存达到一定长度后导出为层2l1)的btx 文件

4. 更新qfl.btx

5. 清理过期层1(缓存)btx

6. 写入/追加 tags

写入流程

清理 dfl 中需要删除的文件

所有层都会产生过期文件,它们由 dfl.btx 维护。

· 加锁

· 遍历每个层级

· 删除对应文件

· 更新表项并写回

· 解锁

获取最新模拟数据

call("data_gen.splx", now(), tz(1))

tz(1)=5s 生成最近一批数据。

更新 tag 列表(点位 ID

· 将新产生的点位加入 tags

· 持久化到 tags.btx

写入缓存 / 写入新文件

判断 mem_data 的时间跨度是否达到下一层级限制:

达到阈值

导出 l1 文件

mem_data = 新数据

更新 qfl.btx

未达到阈值

mem_data = mem_data 新数据(按 id/dt merge

导出缓存文件

更新 qfl.btx

清理缓存 btx

防止缓存文件过多

分层归并脚本

merge.splx


A

B

1

>output("m 1st time sleep(s):"/tz(3))


2

=sleep(tz(3)*1000)


3

>output("merge_start...")


4

=now()


5

=file("qfl.btx").import@bi()

/读查询文件列表

6

=A5.m(2:-2).(lfn=~.m(-1),if(#==1 && ~!=[],(btxs=~.("l1"/~/".btx"),mc(btxs.(file(~).cursor@b()).merge(#1,#2),btxs,#,lfn)),if(#>1 && ~.len()>=tz(2+#)\tz(1+#),(lnum=#,ctxs=~.("l"/lnum/~/".ctx"),mc(ctxs.(file(~).open().cursor()).merge(#1,#2),ctxs,#,lfn)))))

/对非缓存层,归并到组表

7

func mc(cs,fl,lv,mfn)


8


=file("l"/(1+lv)/mfn/".ctx").create(${cfileds})

9


=B8.append@i(cs)

10


>B8.close()

11


=lock("qfl")

12


=file("qfl.btx").import@bi()

13


=B12(lv+2)|=mfn

14


=B12(lv+1).delete(fl.len().())

15


=file("qfl.btx").export@b(B12)

16


=lock@u("qfl")

17


=lock("dfl")

18


=file("dfl.btx").import@bi()

19


>B18(lv)=[long(now()),fl]

20


=file("dfl.btx").export@b(B18)

21


=lock@u("dfl")

22

>output("m:"/(tz(3)*1000-interval@ms(A4,now())))


23

=sleep(tz(3)*1000-interval@ms(A4,now()))


24

goto A3


主要功能

将下层文件自动向上层归并,当数量达到指定阈值,就生成更高层级的 ctx 文件。

归并规则

例如:

· l110分钟)如果有 3600/600 = 6 个文件
合并到 l21小时)

· l21小时)有 6 个文件
合并到 l36小时)

以此类推。

归并子程序

func mc(cs,fl,lv,mfn)

  1. 创建新 ctx 文件

  2. 将多个源文件 cursor 合并

  3. 更新 qfl(删除老文件,加入新文件)

  4. dfl 中记录需要删除的老文件

数据读取逻辑

read.splx


A

B

C

1

=now()



2

=rand@s(1)



3

=n



4

=et=long(now())\1000

/当前时间(long秒)


5

=st=et-t-5



6

=file("qfl.btx").import@bi()



7

=A6.(~.select(~>=st))



8

=A6.(~.select(~<=et+86400))



9

=A7.(~^A8(#))



10

=A9.(if(#==1 && ~,~.("c"/~/".btx"),if(#==2 && ~,~.("l1"/~/".btx"),if(#==3 && ~!=[null,null],~.("l2"/~/".ctx"),if(#==4 && ~!=[null,null],~.("l3"/~/".ctx"),if(~!=[null,null],~.("l4"/~/".ctx")))))))



11

=ctxFiles=A10.m(3:).conj()



12

>ctxResult=null



13

=now()



14

=now()



15

if ctxFiles.len()>0

fork ctxFiles

=file(B15).open().cursor@mx(;id==n && dt>=st && dt<et).fetch()

16


=B15.conj()


17




18

=interval@ms(A14,now())



19

=now()



20

if A10(1).len()>0

fork A10(1)|A10(2)


21



=file(B20).iselect@rb(A3,id).fetch().select(dt>=st && dt<et)

22


=B20.conj()


23

=interval@ms(A19,now())



24

=B16|B22

>output("数据量:"/A24.len())


25

=interval@ms(A13,now())

>output("耗时(ms):"/A25)


1. 按参数n(点位)、t(时间段)确定查询点位、时间范围

2. 读取 qfl.btx 找到可能覆盖时间范围的文件

· 分两部分执行查询

· 查询 ctx(高层组表,历史文件)

· 使用多线程:

· cursor@mx(; id==n && dt>=st && dt<et)

· 查询 btx(最近数据 + 缓存)

· btx 执行:

· iselect@rb(A3,id).fetch().select(...)

3. 合并两部分结果


文件结构

以示例配置时间区间为例

project/

├── config.json

├── init.splx

├── write.splx

├── merge.splx

├── data_gen.splx

├── c${timestamp}.btx ←缓存 btx

├── l1${timestamp}.btx ←10分钟层文件

├── l2${timestamp}.ctx ←1小时层

├── l3${timestamp}.ctx ←6小时层

├── l4${timestamp}.ctx ←1天层

├── tags.btx

├── qfl.btx

└── dfl.btx