SPL 实践:单节点实现每日百亿时序数据实时写入和秒级统计
问题描述
发电设备中会放置传感器(DCS)来采集数据以监控设备运转的状况,某集团设计的电力监控统计系统,需要按固定频率实时采集多个传感器(以下称为测点)上测量的数据后保存,然后提供按任意时间区间内指定测点数据的查询统计功能,统计内容包括最大、最小、平均、方差、中位数等。
数据结构与规模
字段名称 |
字段含义 |
字段类型 |
示例数据 |
id |
测点名 |
String |
Point1 |
dt |
时间戳(秒) |
Int |
946656000 |
type |
数据类型 |
Int |
10 |
qualitie |
质量码 |
Int |
0 |
val |
数值 |
Float |
62.29 |
系统设计规模将支持20万个测点,采集频率为每秒一次,即每秒总共会有20万条数据,数据保存周期在1年以上。
环境与期望
系统要求如下:
1、按指定频率不间断地从消息队列中获取数据并及时存储,不得使队列中数据发生积留;
2、对于10秒前的任意不超过一天的时间区间,从20万个测点中任取100个测点的数据,分别基于每个测点的数值序列完成统计。统计时间要求在30秒内。
满足这两个要求时,期望耗用的硬件资源尽量少。
问题分析
高速写入与高性能统计是矛盾
时序数据库通常会针对时间序列数据进行专门的优化,以满足大规模时间序列数据的高效写入需求,要满足每秒20万条数据量的时序写入不成问题。但只按时间有序存储,要从时间区间为1天的20万个测点中找出任意100个测点的数据进行统计,只能遍历时间区间内的数据,单个测点一天也有86400条记录,20万个测点超过170亿条,空间占用约500G以上。要想在30秒内遍历完这个规模的数据,理想情况下至少也需要50块以上读速300M的固态硬盘。
数据库常用的办法是针对被查找的字段建立索引以提升查找效率。但数据持续写入的同时维护索引的成本很高(涉及插入排序等复杂动作),这会导致写入性能严重下降。
也就是说:时序数据的高速写入和高性能计算存在矛盾。
实际上,即使建立索引也没用。因为每次需要统计的测点名是随机的,索引只能快速定位数据,但这些数据如果在外存中不连续存储,硬盘有最小读取单位,会导致大量无用数据的读出,使得计算变得很慢。
物理有序
如果数据可以按测点物理有序存储,待查找的测点记录变得紧凑了,浪费的读取量很少,查找的速度就变快了。
当只有历史冷数据时,定时对数据按测点排序即可。但实际上,每秒都会有20万条新数据,如果每次获取几秒热数据都与历史数据整体排序,即使不考虑排序本身的时间,仅是重写一遍的时间上都很长。
分层存储
如果将数据分层处理,就可以较好的解决冷热数据之间的矛盾。
分层是指按不同时间周期把数据划分为多个层级存储。这里我们可以分为4层,每层的数据都按测点、时间有序存储。采集的数据每10秒保存一次,作为第一层;每10分钟将第一层的数据合并到一起,作为第二层;每2小时将第二层的数据合并成第三层;每天将第三层的数据合并成第四层。由于每层数据都按测点、时间有序,所以从第二层开始,都可以使用快速归并方法进行合并,得到下一层数据。
这样分层的原因有以下两点:
小内存低延迟
系统允许10秒延迟,所以从消息队列中每持续获取10秒数据保存一次,既可以满足延迟要求,同时也不会占用过多内存。数字化后的测点名为int类型,一条测点数据需要20字节,10秒的20万测点数据约占用38M(10*200000*20/1024/1024)。
高效统计
假设只有第一层,即每10秒把数据按测点有序存储,一天将产生8640个10秒的数据文件。这时,如果统计条件中的时间段为30秒,可以较为精确地找出误差不大于10秒的数据文件(最多只要打开4个10秒文件)。但是当查询时间区间为1天时,就需要打开至少8640个文件,这显然太多了,会严重影响效率。所以需要将数据再合并到更高层(数据文件的时间跨度更长)。
合并数据时,要权衡两个因素:单个数据文件内的时间跨度和统计时需要用到的文件个数。跨度越小,则根据统计条件中的时间段越容易直接选出有效文件;文件个数越多,查找数据时,打开次数也就越多。两者是矛盾的,需要按实际数据规模测试找出折中点。
按上述的层级划分,即使查询区间为一天的数据,最坏情况只会涉及不到100个文件,即从“当前时间点减10秒”到“当前时间点减86410秒”,原因是低层级的文件时间跨度都比较短,所以文件个数也较多,第一层的文件数(60多个),第二、三层(分别约12个)。进行一天前的离线计算时,则只需要打开最多两个第四层按天保存的文件即可。当遇到更长查询区间,比如半年的测点数据统计,只需要打开180个左右的天层级文件,性能损失也不是很大。
采用以上分层方案后,还会给数据维护带来好处。因为系统需要将非常早期的数据删除(比如一年前的),每天直接删除不需要的天层级文件即可。否则,所有冷数据都按测点排序,维护时,删除早期数据会导致重写一遍所有数据。
分层统计
分层后的冷热数据属于不同的数据源,计算也变得复杂些,需要独立计算同源数据的结果后,再将结果合并起来,算出最终的统计结果。例如计算平均值时,需要先分别计算出冷热测点的数量与和,再把冷热两部分的中间结果合起来再计算出均值。而计算方差、中位数时,就只能把所有测点数据全都读到内存再统计的。100个测点一天的数据量,总共也不超64m,内存完全装得下。
实践过程
多线程设计
按上述分析,采集的数据每10秒就要保存一次,即第一层的每轮数据写入需要保证在10秒内完成。这时候就不可能来得及再做高层数据的合并了,因为高层级文件的时间跨度较大,合并动作耗时也就会较长(大于10秒)影响写入,所以数据的分层存储需要使用两个线程实现。
一个线程负责采集数据并写入到第一层文件,每10秒写入一次;另一个线程负责周期性将低层级文件合并为高层级文件,即每隔10分钟将第一层文件(60个10秒文件)合并到第二层。更高层级的合并会以10分钟为周期,根据低层级文件数量判断是否需要执行合并,例如第三层的周期为2小时,即当第二层文件数量大于等于12(12个10分钟)时,将第二层文件合并为第三层,第四层类似(12个2小时)。这两个线程在系统初始化时执行并持续运行。
统计任务则是另外的独立线程,当应用端发起统计请求时,调用计算脚本完成计算。
统计时,需要先列出当前时刻有效的(合并前的低层级文件或合并后的高层级文件)数据文件(文件名中包含了时间信息),再打开文件查找出数据进行统计;而每次合并后,也需要删除已经被合并的低层级数据文件。又因为操作系统对文件的操作并不是原子级的,所以可能发生统计任务中列出有效的低层级文件还未被读取但已经被删除的情况。
全程变量和锁
使用全程量与锁,可以避免上述操作系统非原子级操作文件带来的隐患。
设置全程量QueryFilesList(以下简称qfl),用于记录数据写入、合并后的有效文件信息。用一个序列的序列(二维数组)表示,外层序列长度和层数相同,内层序列为每层的有效文件信息。在写入线程中,每轮采集数据并写入第一层文件后,在1层级序列中追加本轮新写入的文件信息;在合并线程中,当低层级文件合并到高层级后,在高层级对应序列追加合并后的新文件信息。
另外还要有一个全程量DeleteFilesList(以下简称dfl),用于记录已经被合并的待删除的低层级文件信息。同样地,用一个序列的序列表示,外层序列长度比层数少1,因为天层级的不用删,内层序列为待删除文件信息序列和该层待删除标记时间戳(秒)。在写入线程中,每一轮读取该全程量,找出当前时间戳减去待删除标记时间戳大于等于最大查询时间(例如30秒)的所有层级文件信息,删除这些文件后,将对应层级设为null即可(删除周期必定小于最小合并周期);在合并线程中,当低层级文件合并到高层级后,把被合并的低层级文件信息和当前时间戳写入对应层级即可。
在操作以上两个全程量时,均需要加上锁,保证文件操作的原子性。简单起见,锁的名称和全程变量相同。
数据存储
实现场景是个kafka,但搭建比较麻烦。这里我们写一段脚本来模拟取数。
1、模拟生成数据
data_gen.splx
A |
B |
|
1 |
=s=long(elapse@s(s,-m))\1000 |
|
2 |
>n=200000 |
|
3 |
for m |
=n.new(~:id,s+(A3-1)*1:dt,10:type,0:quality,rand(9999)/100:val) |
4 |
>B1|=B3 |
|
5 |
return B1 |
此脚本用来模拟生成数据,每秒20万条测点记录。
调用该脚本返回时刻s(日期时间)前m(自然数)秒数据。
2、写入线程
write.splx
A |
B |
|
1 |
>output("write_start...") |
|
2 |
=now() |
|
3 |
=long(elapse@s(A2,-10)) |
|
4 |
=long(elapse@s(A2,-30)) |
|
5 |
=lock("dfl") |
|
6 |
=file("dfl.btx").import@bi() |
|
7 |
=A6.pselect@a(~ && A4>=~(1)) |
|
8 |
=A6(A7).conj(~(2)).(movefile(~)) |
|
9 |
>A6(A7)=null |
|
10 |
=file("dfl.btx").export@b(A6) |
|
11 |
=lock@u("dfl") |
|
12 |
for |
=now() |
13 |
=call("data_gen.splx",B12,10).sort(id,dt) |
|
14 |
=B13.max(dt) |
|
15 |
=file("l1"/B14/".btx").export@b(B13) |
|
16 |
=lock("qfl") |
|
17 |
=file("qfl.btx").import@bi() |
|
18 |
>B17(1)|=B14 |
|
19 |
=file("qfl.btx").export@b(B17) |
|
20 |
=lock@u("qfl") |
|
21 |
>output("w:"/(10000-interval@ms(A2,now()))) |
|
22 |
=sleep(10000-interval@ms(A2,now())) |
|
23 |
goto A1 |
此脚本将模拟生成的待获取数据存储第一层数据
A5-A11删除30秒(最大查询时间)前的已经被合并了的文件信息
B13模拟读取源数据(10秒的数据量)
B16-B20对全程量qfl追加本轮存储的第一层文件信息,即当前批数据的最大时间戳(秒)
3、合并线程
merge.splx
A |
B |
C |
|
1 |
>output("merge_start...") |
||
2 |
=now() |
||
3 |
=file("qfl.btx").import@bi() |
||
4 |
=A3.m(:-2) |
||
5 |
=A4.(if(#==1,~.("l1"/~/".btx"),if(#==2,~.("l2"/~/".ctx"),if(#==3,~.("l3"/~/".ctx"))))) |
||
6 |
for A5 |
if #A6==1 && A6 !=[] |
|
7 |
=A6.(file(~).cursor@b()).merge(id,dt) |
||
8 |
=file("l2"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) |
||
9 |
=C8.append@i(C7) |
||
10 |
>C8.close() |
||
11 |
=lock("qfl") |
||
12 |
=file("qfl.btx").import@bi() |
||
13 |
=C12(2)|=A4(#A6).m(-1) |
||
14 |
=C12(1).delete(A6.len().()) |
||
15 |
=file("qfl.btx").export@b(C12) |
||
16 |
=lock@u("qfl") |
||
17 |
=lock("dfl") |
||
18 |
=file("dfl.btx").import@bi() |
||
19 |
>C18(1)=[long(now()),A6] |
||
20 |
=file("dfl.btx").export@b(C18) |
||
21 |
=lock@u("dfl") |
||
22 |
else if #A6==2 && A6.len()>=12 |
||
23 |
=A6.(file(~).open().cursor()).merge(id,dt) |
||
24 |
=file("l3"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) |
||
25 |
=C24.append@i(C23) |
||
26 |
>C24.close() |
||
27 |
=lock("qfl") |
||
28 |
=file("qfl.btx").import@bi() |
||
29 |
=C28(3)|=A4(#A6).m(-1) |
||
30 |
=C28(2).delete(A6.len().()) |
||
31 |
=file("qfl.btx").export@b(C28) |
||
32 |
=lock@u("qfl") |
||
33 |
=lock("dfl") |
||
34 |
=file("dfl.btx").import@bi() |
||
35 |
>C34(2)=[long(now()),A6] |
||
36 |
=file("dfl.btx").export@b(C34) |
||
37 |
=lock@u("dfl") |
||
38 |
else if #A6==3 && A6.len()>=12 |
||
39 |
=A6.(file(~).open().cursor()).merge(id,dt) |
||
40 |
=file("l4"/A4(#A6).m(-1)/".ctx").create(id,dt,type,quality,val) |
||
41 |
=C40.append@i(C39) |
||
42 |
>C40.close() |
||
43 |
=lock("qfl") |
||
44 |
=file("qfl.btx").import@bi() |
||
45 |
=C44(4)|=A4(#A6).m(-1) |
||
46 |
=C44(3).delete(A6.len().()) |
||
47 |
=file("qfl.btx").export@b(C44) |
||
48 |
=lock@u("qfl") |
||
49 |
=lock("dfl") |
||
50 |
=file("dfl.btx").import@bi() |
||
51 |
>C50(3)=[long(now()),A6] |
||
52 |
=file("dfl.btx").export@b(C50) |
||
53 |
=lock@u("dfl") |
||
54 |
>output("m:"/(600000-interval@ms(A2,now()))) |
||
55 |
=sleep(600000-interval@ms(A2,now())) |
||
56 |
goto A1 |
此脚本将低层合并到高层
A6循环每个分层,若当前层为第一层且该层有数据文件,则把当前层文件中的数据合并到第二层并更新全程量qfl与dfl(C7-C21),若当前层为第二层且该层有至少12个数据文件,则把当前层文件中的数据合并到第三层并操作全程量qfl与dfl(C23-C37),若当前层为第三层且该层有至少12个数据文件,则把当前层文件中的数据合并到第四层并操作全程量qfl与dfl(C39-C53)
A55与A2每隔10分钟,从A1再次执行以上代码
4、初始化
init.splx
A |
B |
|
1 |
=file("qfl.btx") |
|
2 |
if(A1.exists()) |
=A1.import@bi() |
3 |
else |
>B2=4.([]) |
4 |
=A1.export@bi(B2) |
|
5 |
=file("dfl.btx") |
|
6 |
if(A5.exists()) |
=A5.import@bi() |
7 |
else |
>B6=3.(null) |
8 |
=A5.export@bi(B6) |
|
9 |
=call@n("write.splx") |
|
10 |
=call@n("merge.splx") |
此脚本为初始化脚本
qfl.btx保存全程量qfl,即有效文件名,用于低层向高层的合并和统计计算
dfl.btx保存全程量dfl,即已被合并过的文件名,用于定期(每轮取数时),满足当前时间减去被合并后时间大于最大查询时间,则删除
计算脚本
以平均值为例:
A |
B |
|
1 |
=rand@s(1) |
|
2 |
=200000.sort(rand()).to(100).sort() |
|
3 |
=st=long(elapse@s(now(),-86410))\1000 |
|
4 |
=et=long(elapse@s(now(),-10))\1000 |
|
5 |
=file("qfl.btx").import@bi().m(2:) |
|
6 |
=A5.(~.pselect(~>=st)) |
|
7 |
=A5.(~.pselect(~>=et)) |
|
8 |
=A5.(~.m(A6(#):if(!A7(#),A5(#).len(),A7(#)))) |
|
9 |
=A8.(if(#==1 && ~,~.("l1"/~/".btx"),if(#==2 && ~,~.("l2"/~/".ctx"),if(#==3 && ~,~.("l3"/~/".ctx"),if(~,~.("l4"/~/".ctx")))))) |
|
10 |
=ctxFiles=A9.m(2:).conj() |
|
11 |
>ctxResult=null |
|
12 |
for A2 |
|
13 |
=ctxResult|=ctxFiles.(file(~).open().cursor(id,dt,val;id==A12).select(between@r(dt,st:et))).conjx().groups(id;count(~):cnt,sum(val):amt) |
|
14 |
fork A9(1) |
|
15 |
=file(A14).iselect@rb(A2,id).select(between@r(dt,st:et)).fetch() |
|
16 |
=btxResult=A14.conj().sort(#1).groups(id;count(~):cnt,sum(val):amt) |
|
17 |
=[ctxResult,btxResult].conj().group(id).new(id,~.sum(amt)/~.sum(cnt):avg) |
A2随机从20万个测点中取100个测点并排序
A3为查询时间区间的起始时间戳(秒),其中的-86410为当前时间的前一天(多了10是因为数据有10秒延迟)
A4为查询时间区间的结束时间戳(秒),其中的-10为当前时间(-10是因为数据有10秒延迟)
A5-A9从全程量qfl中初步过滤有效文件信息并拼成实际存储的文件名
A10-B13计算初步过滤后组表文件中满足时间区间的测点个数和val的总和
A14-A16计算初步过滤后集文件中满足时间区间的测点个数和val的总和
A17将组表和集文件计算出的中间结果合并后再统计最终结果
要改变查询区间测试不同情况,可以手动修改A3、A4中的秒数针对性调整。
编号化及还原
以上代码在模拟数据时,是按id已经序号化来写的,实际上先要做转换再存储,完成计算后还要还原。具体介绍可以参考:数据转存时的整数化
实践效果
SPL使用单机(8C64G),可以及时把每秒20万测点数据存储下来,并且计算总时间区间为最新的1天数据(涉及60个10秒层级文件,12个10分钟层级文件,11个2小时层级文件),平均值计算耗时为22秒;某1天的冷数据(一个天层级文件),平均值计算耗时仅为2秒。即只用一台机器就达到了期望 ,硬件资源降到最低。
后记
传统数据库的功能比较单一,只能解决一个环节的问题,比如内存数据库解决热数据问题,大数据平台解决冷数据。而当前问题需要多种技术组合,如果运用多种产品混合实现,又会带来架构的复杂性,增加系统的风险。而且业界中的大数据库产品的架构也较为死板,对存储层基本不提供可编程能力,很难基于这些产品实现某些特殊设计的方案。
相比之下,集算器则拥有开放的技术架构和强大的编程能力(SPL语言),可以被深度控制,从而实现各种因地制宜设计的方案。
英文版