SPL 实现电力高频时序数据实时存储统计

问题背景

发电设备中常常会放置传感器(DCS)来采集数据以监控设备运转的状况,某集团设计的电力监控统计系统,需要实时采集传感器的数据后保存,然后提供按时段的实时查询统计功能。

系统设计规模将支持20万个传感器(以下称为测点),采集频率为每秒一个数据,即每秒总共会有20万条数据,总时间跨度在1年以上。在这个基础上实现任意指定时段的多个测点数据统计,包括最大、最小、平均、方差、中位数等。

系统原结构图为:

系统中,用户期望的统计响应延迟为:从20万个测点中任取100个测点,统计频率最高可能每隔若干秒调用一次,从总时间跨度中统计任意一天的数据,预期执行时间在1分钟内,另外还会有少许离线任务,最长的时间段跨度长达一年。

现有的数据中台中没有计算能力,仅存储数据,计算时需要通过RESTful接口取出数据再统计。经测试,通过RESTful接口从数据中台取数,取出100个测点一天的数据量就需要10分钟时间,还没有开始计算,取数的时间已经远远超出了完成计算的预期时间。

基于现有结构,完成上述统计任务,性能上无法达到预期要求,需要将数据重新存储。

解决办法

第一步,梳理数据和计算要求

数据结构如下:

字段名

类型

中文名

TagFullName

字符串

测点名

Time

长整型

时间戳

Type

数值

数据类型

Qualitie

数值

质量码

Value

浮点数

数值

计算要求为:在每秒生成20万条记录的时序数据中,任意时间段内,从20万个测点中任取100个测点的数据,分别基于每个测点的数值序列统计最大、最小、方差、中位数等结果。

第二步,确定存储和计算方案

20万测点一天的数据,仅Value字段,就要200000*86400*4字节,至少64g内存,当总时间跨度为1年时,数据量会有数十T,单台服务器内存显然装不下。多台服务器集群,又会带来很高的管理和采购成本。

简单按时间为序存储的数据,可以迅速找到相应时间区间,但即使是这样,单个测点一天也有86400条记录,20万个测点共17.28亿条,每次统计都要遍历这个规模的数据,也很难满足性能要求。

那么测点号上建立索引是否可行?

索引只能快速定位数据,但这些数据如果在外存中不是连续存储的,硬盘有最小读取单位,会导致大量无用数据量读出,使得计算变得很慢,同样也无法满足性能要求。此外,索引占用空间会随着数据量增大而增大,并且插入数据的维护开销也更大。

如果数据可以按测点号物理有序存储,并在测点号上建立索引,相比时序物理有序存储,查找时,待查找的测点记录变得紧凑了,需要读入的块也就少了。100个测点的数据存成文本约300m不到,这样即使使用外存也可以满足性能要求。

只有历史冷数据时,处理起来比较简单,定时将数据按指定字段排序即可。但实际上,每秒都会有20万个测点的新数据,因为历史数据规模巨大,如果每次获取几秒热数据都与历史数据整体按测点号、时间排序,即使不算排序,仅是重写一遍的时间成本上都无法接受。

这时,需要将冷热数据区分对待。不再变化的冷数据可以按测点次序准备好。这里有一点变通,因为要将非常早期的数据删除(比如一年前的),如果所有冷数据都按测点排序时,会导致数据维护比较麻烦,删除早期数据会导致重写一遍所有数据。因此,可以设计为先按时间分段,每段时间内的数据按测点、时间有序,整体数据还是按时间有序。任务需求是按天计算,这里按天分段就比较合适,更长跨度的离线计算性能损失也不是很大。每当一天过去时,将昨天数据按上述规则排序后存储,当天的数据作为热数据处理。但是,当天内的数据量还是太大了,依然无法全部装入内存,还需要再分。

经过一些测试后确认,我们发现将数据按热度分为三层可以满足要求。第一层,十分钟内的热数据通过接口读入内存;第二层,每过10分钟,将过去10分钟的内存数据按测点、时间有序保存到外存;第三层,每过一天,将过去24小时内的所有每10分钟的数据按测点、时间有序归并。总数据为:一年的数据由365段每天数据,加144段当天数据和一段内存数据。

分层后的冷热数据属于不同的数据源,需要独立计算同源数据的结果后,再将结果合并起来,算出最终的统计结果。即使计算方差、中位数这种需要全内存统计的情况,100个测点一天的数据量,也只需要64m内存。

第三步,确定技术选型和方案

从上述的存储方案中得知,需要将实时数据按时间分段,段内按测点号、时间物理有序存储,常规数据库显然没办法做到这点。此外,拆分数据需要可以支持按自定义时间段灵活地拆分;数据存储时要具备高性能索引;冷热数据属于不同层(不在同一个数据源),计算时需要分别计算后再合并。

完成该任务,用Java硬编码工作量巨大,Spark写起来也很麻烦。开源的集算器SPL语言提供上述所有的算法支持,包括高性能文件、物理有序存储、文件索引等机制,能够让我们用较少的代码量快速实现这种个性化的计算

取数不能再用原系统的RESTful接口,也不合适直接通过APIDCS获取数据。用户方商定后引入kafka缓冲数据,屏蔽DCS层,同时还可以将DCS的数据提供给不同的消费者使用。变更后的系统结构图如下:

说明:

1DCS系统每秒推送20万个测点数据至Kafka MQ

2Kafka MQSPL:使用SPL基于Kafka API封装的Kafka函数,连接Kafka、消费数据。

3、内存缓冲:循环从Kafka消费数据(kafka_poll),每轮循环确保10秒以上的数据量,将每轮前10秒的数据补全后,按测点、时间序,保存成文件并读入内存。

4、分层数据文件:按不同时间段将冷热数据文件分层。

5、统计时将冷热数据混合计算。

6、支持每个测点名对应一个CSV文件作为数据源计算。

7、统计接口以HTTP服务方式供外部应用调用并将统计结果通过回调接口返回给外部应用。

第四步,实施优化方案

现有的RESTful接口取数太慢了,接口变为从kafka消费数据。存储数据时,将字符串类型的测点名数字化后保存,以获得更小的存储量和更好的运算性能。

在第二步中已经提到,数据量较大时,无法将数据都放在内存中计算,所以考虑采用冷热分层方案,将数据分为三层,每天的冷数据按测点号、时间有序(下文中的所有外存文件存储均采用该序,不再重复说明),用组表存储,因为大表对性能的影响很大,存储成组表有利于提升系统整体性能;当天的每10分钟的冷数据用,集文件存,因为集文件创建和使用都更简单,用来存储小表会很便捷,也不会因为索引块而降低存储效率;10分钟内的热数据从kafka直接读到内存,因为数据本身是通过kafka接口获取的,另外数据可能有一定的延迟,不适合每秒取数即写出。

测试后发现,10分钟内的热数据,从kafka获取后再解析json,不但需要消耗大量内存,而且解析json也需要花费很长的时间。这样在内存中直接加载热数据是没办法用来统计计算的,所以将热数据改为每10秒存成一个集文件。

接下来开始实现统计计算部分。每天组表中的冷数据计算较快,但是当天的144个集文件计算很慢。通过计算可以知道,每10分钟的数据量约1.2亿条记录,这个规模的数据可以用组表来存储,另外还可以再加一层每2小时一个组表文件,来减少当天总文件数的数量(从144个变成了24个)。实际上,计算时采用的二分查找是对单个文件内有序的测点号使用的,减少了文件个数,也就是减少了总查找次数。

最终,我们把数据分成了4层。第一层:延迟10秒的集文件热数据;第二层,每10分钟的组表冷数据;第三层,每2小时的组表冷数据;第四层,每天的组表冷数据。由于每层数据都按测点号、时间有序,所以每一层都可以用归并,快速生成下一层数据文件。

这时的冷数据计算已经很快了,可以满足实际使用,但是热数据的计算相比冷数据还是很慢。观察发现,热数据的所有集文件都加起来大约3G,不算很大,内存可以装下。实际测试,把文件读到内存中再查找相比直接外存文件查找可以快出好几倍。

已知的统计计算,分为最大值、最小值、中位数、方差、平均值等,不尽相同,但是之前的数据查找是一样的。都用二分法,找出对应测点号组的数据,再用时间过滤,即可得到相应的value值。

实测效果

经过几天时间的SPL编码、测试,优化的效果非常明显。优化之后的测试结果如下(耗时为毫秒):

测点数

时间段

10

50

100

10分钟

467

586

854

1小时

1739

3885

4545

6小时

2599

7489

13138

1

4923

16264

30254

说明:测试环境使用的机械硬盘,对并发计算不友好,更换为固态硬盘后,测试结果还会有较大的提升。

后记

解决性能优化难题,最重要的是设计出高性能的计算方案,有效降低计算复杂度,最终把速度提上去。因此,一方面要充分理解计算和数据的特征,另一方面也要熟知常见的高性能算法和存储方案,才能因地制宜地设计出合理的优化方案。本次工作中用到的基本高性能算法和存储方案,都可以从下面这门课程中找到:点击这里学习性能优化课程,有兴趣的同学可以参考。

传统数据库的功能比较单一,只能解决一个环节的问题,比如内存数据库解决热数据问题,大数据平台解决冷数据。而当前问题需要多种技术组合,如果运用多种产品混合实现,又会带来架构的复杂性,增加系统的风险。而且业界中的大数据库产品的架构也较为死板,对存储层基本不提供可编程能力,很难基于这些产品实现某些特殊设计的方案。

相比之下,集算器则拥有开放的技术架构和强大的编程能力(SPL语言),可以被深度控制,从而实现各种因地制宜设计的方案。