超多位点高频时序数据的实时存储和统计
一、问题提出
时间序列数据主要由电力行业、化工行业、气象行业、地理信息等各类型实时监测、检查与分析设备所采集、产生的数据,这些工业数据的典型特点是:产生频率快(每一个监测点一秒钟内可产生多条数据)、严重依赖于采集时间(每一条数据均要求对应唯一的时间)、测点多信息量大(常规的实时监测系统均有成千上万的监测点,监测点每秒钟都产生数据,每天产生几十GB甚至更多的数据量)。对数据库的存储成本、写入与查询性能都有很高的要求。
下面我们以某检测系统为例,介绍如何使用开源集算器SPL来处理这类超多位点高频时序数据的实时存储和统计。
需求描述:每秒有最大20万个位点的时序数据需要存储,数据保留时间跨度为1年。基于这些数据,实现任意指定时段的多个测点数据统计,包括最大、最小、平均、方差、中位数等。
我们会按照以下几个步骤顺序逐步探讨:
1) 数据采集
2) 数据存储
3) 数据统计
二、数据采集
数据采集是指将检测设备的数据读取到SPL服务器,通常分为以下两种方式:
1、直接调用检测设备API
SPL直接调用检测设备提供的API,读取数据。由于各类检测设备的API无法统一标准,这种方式需要定制开发调用接口,优点是数据延迟更低,缺点是不够通用,协议转换复杂,开发难度较大。
2、消息队列
预先将设备产生的数据推送到消息队列,SPL从消息队列消费数据。这种方式下,可以直接使用SPL已经提供的消息队列客户端函数连接消息队列并消费数据,优点是通用性更好,缺点是读取到的数据相比API的方式延迟更高。
实际上,无论使用哪种数据采集方式,只要在相同的数据规模与统计要求下,数据存储策略都可以看作是一致的。下文中,我们以从Kafka消费数据的方式采集数据为例,详细介绍如果实现数据存储和统计。
三、数据存储
数据结构如下:
字段名 |
类型 |
中文名 |
TagFullName |
字符串 |
测点名 |
Time |
长整型 |
时间戳 |
Type |
数值 |
数据类型 |
Qualitie |
数值 |
质量码 |
Value |
浮点数 |
数值 |
通常情况下,测点名是以字符串的形式出现的。如果还是按原测点名直接存储数据,存储成本较高,统计时查找效率也会较低,所以要将测点名数字化。这时可能遇到两种情况:一种是测点名无法预先知晓,需要从消费数据中动态获取并数字化成字典表;另一种情况则可以从已知的测点名预先建立数字化字典表,后期再进行较长时间间隔不定时更新字典表。
1、测点名需要动态补充的情况
A |
B |
C |
D |
|
1 |
=file("kafka_client.properties") |
|||
2 |
=kafka_open(A1;"data-transfer") |
|||
3 |
>env(tagsList,if(file("tagsList.btx").exists(),file("tagsList.btx").import@b().sort(name),create(name,id))) |
|||
4 |
=now() |
|||
5 |
for |
=now() |
||
6 |
=kafka_poll(A2).(json(value)).news(Datas;~.TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) |
|||
7 |
>B4|=B6 |
|||
8 |
if interval@ms(B5,now())>10000 |
|||
9 |
=B4.id(#1)\tagsList.(#1) |
|||
10 |
if C9!=null |
>env(tagsList,(tagsList.(#1)|C9).new(~:name,#:id).sort(#1)) |
||
11 |
=file("tagsList.btx").export@b(tagsList) |
|||
12 |
=file("url.txt").import@i() |
|||
13 |
=httpfile("http://"/D12(1)/":"/D12(2)/"/httpinit.splx").read() |
|||
14 |
>B4.run(#1=tagsList.select@b(name==~.TagFullName).id) |
|||
15 |
>B4=B4.sort(#1,#2) |
|||
16 |
=maxTime=B4.max(Time) |
|||
17 |
=file("tmp_"/maxTime/".btx").export@b(B4) |
|||
18 |
=movefile(file("tmp_"/maxTime/".btx"),(maxTime/".btx")) |
|||
19 |
>B5=now(),B4=null |
第1、2行:用topic为data-transfer的配置文件kafka_client连接kafka server。
第3行:全局变量tagsList中是数字化后的字典表,包含测点名和数字,按测点名有序。
第6行: kafka中的数据结构是这样的:
{
"Datas":[
{
"TagFullName":"point1",
"Type":10,
"Qualitie":0,
"Value":62.29038255706044
},
…
{
"TagFullName":"point100",
"Type":10,
"Qualitie":0,
"Value":-53.27840963536921
}
],
"Time":1673321221862
}
B6单元格中,SPL的kafka_poll函数,按配置文件内容,从kafka消费一轮数据,解析json并扁平化。
第9行至第13行:每一批内存数据(这里是从kafka消费10秒以上,由B8确定),将第一列去重排序得到的序列与已知字典表中的有序测点名序列求差集,若不为空,则将已有测点名序列并上新增的测点名生成字典表后更新全局变量和外存字典表。另外由于统计的http服务为另一个进程,所以还需要更新统计服务的全局变量,详细介绍见下文统计章节内容。
第14行:现在有了字典表,就可以将当前批的内存数据中测点名数字化了。
第15行:按将内存中的数据按测点、时间有序排序。
第16行至第18行:将排序后的内存数据导出到按本批次最大时间戳命名的集文件中。
第19行:重置下一轮的起始时间,清空内存数据。
2、已知一批测点名后续再更新的情况
从kafka消费数据并持久化如下:
A |
B |
C |
|
1 |
=file("kafka_client.properties") |
||
2 |
=kafka_open(A1;"data-transfer") |
||
3 |
>env(tagsList,file("tagsList.btx").import@b().sort(name)) |
||
4 |
=now() |
||
5 |
for |
=now() |
|
6 |
=kafka_poll(A2).(json(value)).news(Datas;tagsList.select@b(name==~.TagFullName).id:TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) |
||
7 |
>B4=[B4,B6.sort(#1,#2)].merge(#1,#2) |
||
8 |
if interval@ms(B5,now())>=10000 |
||
9 |
=maxTime=B4.max(Time) |
||
10 |
=file("tmp_"/maxTime/".btx").export@b(B4) |
||
11 |
=movefile(file("tmp_"/maxTime/".btx"),maxTime/".btx") |
||
12 |
>B5=now(),B4=null |
这部分代码在前面几乎都出现过,不再详细解释。
新增测点名的方式,我们也通过从kafka消费数据来得知并添加,如下:
A |
B |
C |
D |
|
1 |
=file("addpoint.properties") |
|||
2 |
=kafka_open(A1;"insert-point") |
|||
3 |
for |
=now() |
||
4 |
=kafka_poll(A2).conj(json(value)) |
|||
5 |
if B4.len()>0 |
=B4.sort()\tagsList.(#1) |
||
6 |
if C5!=null |
=C5.len() |
||
7 |
>env(tagsList,(tagsList.(#1)|C5).new(~:name,#:id).sort(#1)) |
|||
8 |
=file("tagsList.btx").export@b(tagsList) |
|||
9 |
=file("url.txt").import@i() |
|||
10 |
=httpfile("http://"/D9(1)/":"/D9(2)/"/httpinit.splx").read() |
这部分代码同样在前面出现过,不再详细解释,需要注意的是,以上的两个SPL脚本需要再同一个进程中执行,因为都用到了tagsList这个全局变量。
3、补数据
实际生成环境中,还可能出现一种情况,若相邻两秒的数据没有发生变化,则后一秒的数据不会再被推送至kafka,按上述代码持久化时就会出现漏数据的问题,这时就需要在内存中将数据补全后再写到文件。
基于前面代码,补数据的场景需要改成这样:
A |
B |
C |
|
1 |
=file("kafka_client.properties") |
||
2 |
=kafka_open(A1;"data-transfer") |
||
3 |
>env(tagsList,file("tagsList.btx").import@b().sort(name)) |
||
4 |
>env(valsList,tagsList.len().([,,])) |
||
5 |
=now() |
||
6 |
for |
=now() |
|
7 |
=kafka_poll(A2).(json(value)).news(Datas;tagsList.select@b(name==~.TagFullName).id:TagFullName,round(Time,-3):Time,~.Type,~.Qualitie,~.Value) |
||
8 |
>B5=[B5,B7.sort(#1,#2)].merge(#1,#2) |
||
9 |
if interval@ms(B6,now())>=12000 |
||
10 |
=valsList.len().run(cedianhao=~,~=10.new(cedianhao:TagFullName,minTime+(~-1)*1000:Time,Type,Qualitie,Value)).conj() |
||
11 |
=B5.select(Time<(minTime+10000)) |
||
12 |
=C11.([#1,#2]) |
||
13 |
>C10.select(C12.contain@b([TagFullName,Time])).run(Type=C11(#).Type,Qualitie=C11(#).Qualitie,Value=C11(#).Value) |
||
14 |
>C10.run(tfn=#1,if(Value==null,Type=valsList(tfn)(1),valsList(tfn)(1)=Type),if(Value==null,Qualitie=valsList(tfn)(2),valsList(tfn)(2)=Qualitie),if(Value==null,Value=valsList(tfn)(3),valsList(tfn)(3)=Value)) |
||
15 |
=file("tmp_"/(minTime+9000)/".btx").export@b(C10) |
||
16 |
=movefile(file("tmp_"/(minTime+9000)/".btx"),(minTime+9000)/".btx") |
||
17 |
>B5=B5.select(Time>=(minTime+10000)),minTime+=10000 |
||
18 |
>B6=datetime(long(B6)+10000) |
代码中多了全局变量valsList,用来存储每个测点上一秒的数据。
第9行:每轮消费时间间隔为12秒,因为考虑到推送数据有网络延迟的情况,这里假定不大于两秒,则12秒内的前10秒数据一定已经确保拿到了。
第10行:因为测点名和时间都是已知的,所以预先建立其他数据为空的序表。
第11行至第14行:选出12秒中的前10秒数据,填充第10行的空序表,再利用valsList补填完后的序表得到完整的内存数据。
新增测点名的方式的代码基本不变,只需要添加>env(valsList,valsList|=C5.([,,]))来确保上一秒测点号长度一致即可。
按以上任意情况实现,都是每隔10秒就会产生1个集文件,区别在于如何更新测点名字典表以及是否需要补数据。数据存储功能做到这个程度还不够,因为统计时间段较长的话,涉及到的文件个数就多了,查找次数也就多了,这会影响统计的效率。需要再按实际的数据规模,将数据再分层合并。
4、分层合并数据
合并数据的目的主要是为了统计效率,有两点要考虑:单个数据文件内的时间跨度和文件个数。对前者,跨度越小,则根据统计条件中的时间段越好直接选出有效文件;对后者,文件个数越多,查找次数也就越多。两者是矛盾的,需要按实际数据规模测试找出折中点。
以10分钟为一层举例,分层代码如下:
A |
B |
|
1 |
=t=0 |
|
2 |
for |
=now() |
3 |
=directory("?????????????.btx").conj().sort(file(~).date()).m(:-2) |
|
4 |
=B3.max(long(left(~,13))) |
|
5 |
=B3.(file(~).cursor@b()).mergex(#1,#2) |
|
6 |
=file("tmpl2_"/B4/".ctx").create@y(#TagFullName,#Time,Type,Qualitie,Value) |
|
7 |
>B6.append(B5) |
|
8 |
>B6.close() |
|
9 |
=movefile(file("tmpl2_"/B4/".ctx"),"l2_"/B4/".ctx") |
|
10 |
=B3.(movefile(~)) |
|
11 |
>t=interval@ms(B2,now()) |
|
12 |
=sleep(600000-t) |
每10分钟将当前n-1个每10秒测点、时间有序集文件归并为一个组表文件(因为10分钟20万测点的数据量足够大,否则仍采用集文件,直到某一层数据量够大。实际应用中,通常用集文件存储小表,用组表存储大表。这样做,主要是因为大表对性能的影响很大,存储成组表有利于提升系统整体性能。而且,组表头部有个索引块,即使只有一行数据也要至少占一个块,列存会占得更多。索引块对于大数据来说可以忽略,但对小数据来说就不合适了)。
四、数据统计
统计服务通常需要被其他系统调用,可以使用SPL自带的http服务来满足需求。但由于http服务与后台存储数据不属于同一个进程,无法共享数据存储功能中的全局变量,每当测点表更新时,需要同步字典表。
1、同步测点名字典表
根据最新的数字化测点名文件来同步统计服务的字典表,代码比较简单,只要一句:
>env(tagsList,file("tagsList.btx").import@b().sort(name))
有了字典表,再基于合适的分层文件,可以实现高效的统计功能,但是因为大小表存储采用的文件类型不同,所以统计时需要对不同类型文件的分别计算后合并结果。
2、组表和集文件的混合计算
以平均值为例:
A |
B |
C |
|
1 |
=tags |
=st |
=et |
2 |
=directory("l*.ctx").sort(mid(~,4,13)) |
||
3 |
=A2.pselect(long(mid(~,4,13))>=st) |
||
4 |
=A2.pselect(long(mid(~,4,13))>=et) |
||
5 |
if A3!=null || A4!=null |
=ctxFiles=A2.m(ifn(A3,1):ifn(A4,-1)) |
|
6 |
>ctxResult=null |
||
7 |
for A1 |
>ctxResult|=ctxFiles.(file(~).open().cursor(TagFullName,Time,Value;TagFullName==B7).select(between@r(Time,st:et))).conjx().new(TagFullName,Value).fetch() |
|
8 |
else |
>ctxResult=null |
|
9 |
=directory("?????????????.btx").sort(left(~,13)) |
||
10 |
fork A9 |
=file(B10).iselect@rb(A1,TagFullName).select(between@r(Time,st:et)).new(TagFullName,Value).fetch() |
|
11 |
=btxResult=B10.conj().sort(#1) |
||
12 |
=[ctxResult,btxResult].conj().group(#1).conj(~.select((#-1)%sn==0).groups(#1;avg(#2))).run(tmp=#1,#1=tagsList.select(#2==tmp).#1) |
tags是数字化后的测点名,如:[51,2,39,…],st是统计起始时间戳,et是统计结束时间戳。
第2行至第8行:找出满足时间区间的组表文件,依次查找满足统计条件的测点、数值。
第9行至第11行:并行查找所有集文件中满足统计条件的测点、数值。
第12行:混合计算出结果并将数字化测点名转成串。
英文版