实时报表 T+0 的实现方案
【摘要】
基于数据库系统的 T+0 全量实时查询,在数据量很大时一般只能进行数据库扩容(包括分库手段),成本高昂;如果采用文件系统和生产数据库混合运算,就可以实现低成本高性能的 T+0 查询,而热导出机制则是这个方案的基础!让我们一起去乾学院看个究竟吧:实时报表 T+0 的实现方案!
一 问题背景
在报表的应用系统中,用户越来越关注数据的实时性,希望最新发生的数据能在报表中体现出来,也就是我们常说的T+0场景, 以此及时辅助决策、驱动运营。
比如交通大数据应用的场景:需要结合实时数据了解车辆通行密度,合理进行道路规划,同时根据历史数据预测线路拥堵情况、事故多发地提醒等等。
但常规的方案:报表+数据仓库+ETL工具很难实现此类实时报表,往往只能看到昨天、上周甚至是上个月的情况,也就是T+1、T+7、T+30等,我们统称为T+n报表。
究其原因,困难大概体现在如下三个方面:
1、如果报表的历史数据和最新数据都从生产系统读取,虽然可以实现T+0报表,但是会对生产数据库造成压力,当数据量越来越大时,产生性能瓶颈,直接影响业务;并且大量的历史数据会占用高昂的数据库成本(存储成本和性能成本)。
2、如果采用数据仓库的方式,那么ETL从生产库中取出数据,需要较长的“窗口时间”,一般是业务人员下班之后,到第二天早上上班之前,所以能看到的最新数据也只能是T+1。
3、虽然理论上可以从历史库中和生产库中同时取数据形成实时报表,但是一般的报表工具都不具备跨库混合计算的能力,其他的跨库计算方案又比较复杂,难以实施,并且性能较低。
二 解决思路
那么,是否有成本更低、实施起来更简单的T+0报表方案呢?下面将要介绍的润乾集算器,就是这样一款利器,利用集算器的混合数据源能力就能实现低成本的T+0实时报表。
实现思路:把不再发生变动的大量历史数据采用数据文件存储,仅从生产库读取少量新数据,在保证报表实时性的同时,降低了历史数据存储的成本,减少了报表系统对生产数据库造成的负载。
下图显示了常规T+n方案和集算器T+0方案的结构对比,应该说,引入集算器后,减少了很多不必要的成本和多余的组件,整个体系架构也变得更加清新与合理了:
上图新处理方式体系结构中的”导出(非实时)”是指在非工作时间(例如晚上),定时将生产数据库的新增数据同步到存储历史数据的文件中;
关于数据外置方案、设计数据存储组织、定时任务等相关准备和外围工作,具体做法可参考<<基于文件系统实现可追加的数据集市>>的相关章节,这里不再赘述。
三 混合运算场景
下面,我们就通过制作“实时流程工站不良柏拉图”这个例子,来看一下集算器是如何利用历史数据结合当期数据进行混合运算,实现T+0方案的。报表最终的展示效果如下图:
这张报表清楚地显示了电子设备在生产过程中,80%的问题是由20%的原因造成的,对于找出产生大多数问题的关键原因很有优势。
报表中数据的查询过程是:根据选择开始日期、结束日期进行过滤查询;先按照不良代码分组,统计汇总每个分类的不良数量,并按照汇总数量降序,然后计算出不良累计比率(算法为“(不良数量累计汇总/总不良数量汇总)*100”)。报表上部的查询按钮是报表工具提供的“参数模板”功能,具体做法参见教程,这里不再赘述。
3.1编写数据查询脚本
我们假定已经将变化不大的历史数据搬出了数据库,采用集文件(集文件利用集算器提供的压缩格式,具有更好IO性能)存储,命名为MES-pre.btx,同时每天定时执行数据同步脚本,把前一天的数据追加到当前数据文件中;查询涉及的当天少量数据直接从生产数据库(demo)取出,以此保证数据的实时性。集算器SPL脚本如下(也支持仅查历史数据的情况):
A |
B |
|
1 |
=connect("demo") |
=A1.query("SELECT code,name FROM watch") |
2 |
=A1.cursor@x("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(now()),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(now()))+"'","1=0")) |
|
3 |
=file("D:/PT/MES-pre.btx").cursor@b().select(filedate>=Bfiledate && filedate<Efiledate) |
=[A2,A3].conjx() |
4 |
=B3.groups(code:不良代码,code:不良名称;sum(nums):不良数量,sum(nums):不良累计数量,sum(0):不良累计比率) |
|
5 |
>A4.switch(不良名称,B1:code) |
=A4.sort(不良数量:-1) |
6 |
>B5.run(不良累计数量+=不良累计数量[-1]) |
=A4.sum(不良数量) |
7 |
>B5.run(不良累计比率=round(不良累计数量/B6*100,2)) |
=B5.new(不良代码,不良名称.name:不良名称,不良数量,不良累计数量,不良累计比率) |
8 |
return B7 |
A1:连接预先配置好的生产数据库(demo)
B1:查询字典表,不良代码、不良名称
A2:建立数据库游标,用简单的sql读取数据表的数据。Sql的过滤条件部分会根据逻辑判断进行动态拼接,当结束日期>=当前系统日期时,代表查询当天的实时数据,否则做一次结果为空的查询动作,以适应只查历史数据的业务场景。@x选项是指读完数据库后关闭连接。
A3:建立数据文件D:/PT/MES-pre.btx的游标。文件游标允许分批从大数据文件中读取数据,从而避免内存溢出。@b选项是指按照集算器提供的二进制格式来读取文件,同时根据传入的开始日期(Bfiledate)、结束日期(Efiledate)过滤出符合条件的记录
B3:将数据库游标(新数据)和文件游标(历史数据)合并
A4:利用groups函数,完成对合并后游标的分组汇总,同时多构造了几列:不良名称、不良累计数量、不良累计比率,方便后面的赋值计算。
A5:通过switch()函数在A4结果的”不良名称”字段上建立指向B1表中code字段的指针引用记录,实现关联,如下图:
B5:按照不良数量降序排列。如下图:
A6:计算不良累计数量;可以看到,集算器用“不良累计数量[-1]”来表示上一行的不良数量,可以轻松进行相对位置的计算。
B6:对不良数量进行总计
A7:计算出不良累计比率,算法为“(不良数量累计汇总/总不良数量汇总)*100”,同时保留两位小数,计算结果如下图:
B7-A8:取出需要的字段,将关联了不良名称后的结果集返回给报表工具,如下图:
3.2作为报表数据源
在利用集算器完成了数据查询工作后,为了在报表中使用查询结果,可以在报表中直接将集算器设置为数据源,用法和使用数据库一样简单,具体做法如下:
l 在报表中定义参数(Bfiledate、Efiledate),
l 设置集算器数据集,并传递报表参数,
l 设计报表统计图
如下图所示:
完成报表设计后,输入参数进行计算,就可以得到希望的报表了。
四 数据预处理
上一章节中,通过对历史数据(文件)和实时数据(数据库)进行混合计算,就能够轻松实现实时报表(T+0)方案;而为了做到这一点,相应的数据预处理,包括怎么导出到文件、设计怎样的存储组织等,也就显得尤为重要了。
接下来将讨论历史数据导出到文件的几种模式及优缺点分析:冷导出、折中办法、热导出。
4.1冷导出
关于用文件存储历史数据能够带来的诸多好处,可以参考<<基于文件系统实现可追加的数据集市>>的相关章节,这里不再赘述。
所谓冷导出,就是允许有一段 “时间窗口”,能够从生产库取出历史数据追加导出到文件中。例如每天的凌晨2-6点为定时执行任务的时间窗口。
冷导出的缺点也很明显,在追加数据导出到文件的这段时间里,这个文件是不可读的,也就是说相关的查询也无法进行了,所以,从本质上说,冷导出并没有真正意义上做到T+0实时查询(生产系统不停机,查询系统也不停机)。
不过,这里顺便解释一下:如果使用另一个数据库存储历史数据,就不会有这样的问题。原因在于关系型数据库支持事务一致性,数据写入的同时仍然可以很好地支持查询。当然这样做肯定也会牺牲一部分性能,当每天导出的数据量较多时对资源占用相当巨大(因为数据库回滚段会很大)。
所以一致性和高性能在一定程度上是矛盾的。数据库虽然有一致性,但数据库本身太慢太贵;而集算器(集文件)可以获得高性能,但没有事务一致性,在维护数据的同时不能参与其他计算。
不过,在对业务场景要求不是很高的情况下,冷导出也是够用了,下面我们还是简单举例说明一下如何编写集算器脚本,获取昨天的历史数据追加到当前集文件中,代码如下:
A |
B |
|
1 |
=file(“D:/PT/MES-pre.btx”) |
=connect("demo") |
2 |
=B1.cursor@x("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) |
>A1.export@ab(A2) |
A1:按路径打开需要导出的集文件路径
B1:连接数据库(demo)
A2:根据sql创建数据库游标,获取昨日数据,参数为昨天日期; @x选项是指读完数据库后关闭
B2:执行结果追加写入到集文件
4.2折中办法
针对”冷导出”方案的不足,比较容易想到的折中办法就是:历史数据不再按照追加的模式写入到一个集文件中,而是把文件拆开,让彼此之间的耦合度更低,互不影响。这样做的话,就需要考虑以下两条规则:
1、每天导出一个独立的集文件,可以用年月日命名,这样导出过程中,就不会影响对已导出的历史数据的查询。
2、在查询脚本中增加时间范围判断,规避掉导出的”时间窗口”;比如定时任务的时间窗口为每天凌晨2-6点;在查询脚本中,可以根据查询动作的当前时间点进行逻辑判断,如果查询发生在当天6点以后,说明数据导出已经完成,那么数据来源就是集文件(到昨天为止的历史数据)+当前数据库(到今天当前时间点的新数据),若查询发生在当天6点以前的,那就是集文件(到前天为止的历史数据)+当前数据库(昨天到今天当前时间点的新数据)。
这种办法的缺点就是在设计数据存储组织时,文件会分的比较碎,逻辑判断部分的代码也会显得比较冗长,而文件管理也会麻烦一些。但不管怎样,还是能够达到要求,实现真正意义上的实时报表(T+0)方案。下面介绍一下实现步骤。
4.2.1设计数据存储组织
历史数据按照业务模块进行划分,每天数据存一份集文件。目录结构为:/业务模块/数据明细表/年月日文件名,如下图所示:
4.2.2同步昨天数据到文件
改造“冷导出”方案中数据导出脚本,从数据库中获取昨天的历史数据每天存一份集文件,用年月日命名,代码如下:
A |
B |
|
1 |
=file(“D:/PT/”+string(after(date(now()),-1),"yyyyMMdd")) |
=connect("demo") |
2 |
=B1.cursor@x("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) |
>A1.export@b(A2) |
A1:按路径打开需要导出的集文件路径,每天一个,用年月日命名
前面已经解释过的格子的代码这里不再赘述。
4.2.3数据查询
首先,我们需要写一个工具脚本,主要功能是能够根据传入的开始日期、结束日期,过滤出需要查询跨度范围的多个集文件路径,同时判断路径下的集文件对象是否存在。脚本命名为:判断读取文件的范围.dfx,编写代码如下:
A |
|
1 |
=if(endDate>=date(now()),if(now()>datetime(concat(date(now())," 06:00:00")),after(endDate,-1),after(endDate,-2)),endDate) |
2 |
=periods(startDate,A1,1) |
3 |
=A2.(path+string(~,"yyyyMMdd")) |
4 |
=A3.select(file(~).exists()) |
5 |
return A4 |
脚本接收3个参数,开始日期(startDate),结束日期(endDate),集文件的存储路径(path)
A1:当传入的结束日期>=当前系统日期时,并且当前时间是在当天6点之后的,返回昨天日期,在当天6点之前的,返回前天日期,否则就返回传入的实际结束日期
A2:根据开始日期,计算后的结束日期,默认按天间隔获取日期范围
A3:循环A2,通过集文件的存储路径与该日期段内的年月日进行拼接,利用string()函数进行格式化
A4:判断路径下的文件是否真实存在,由A5返回实际存在的文件路径,最终结果如下图:
然后,我们需要对前面章节中“混合运算场景”数据查询的脚本做一些改造,值得注意的是这里将采用多路游标的概念,将多个游标合并成一个游标使用,改造后的脚本如下:
A |
B |
C |
|
1 |
=connect("demo") |
=A1.query("SELECT code,name FROM watch") |
|
2 |
=A1.cursor@x("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(now()),if(now()>datetime(concat(date(now())," 06:00:00")),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(now()))+"'","DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(after(date(now()),-1))+"'"),"1=0")) |
||
3 |
=call("D:/PT/判断读取文件的范围.dfx",Bfiledate,Efiledate,"D:/PT/数据表A/") |
||
4 |
=A3.(file(~).cursor@b()) |
=(A2|A4).mcursor() |
|
5 |
=B4.groups(code:不良代码,code:不良名称;sum(nums):不良数量,sum(nums):不良累计数量,sum(0):不良累计比率) |
||
6 |
>A5.switch(不良名称,B1:code) |
=A5.sum(不良数量) |
=A5.sort(不良数量:-1) |
7 |
>C6.run(不良累计数量+=不良累计数量[-1]) |
>C6.run(不良累计比率=round((不良累计数量/B6)*100,2)) |
|
8 |
=C6.new(不良代码,不良名称.name:不良名称,不良数量,不良累计数量,不良累计比率) |
||
9 |
return A8 |
前面已经解释过的格子代码这里不再赘述。
A2:建立数据库游标,根据逻辑判断动态拼接sql,当查询的结束日期>=当前系统日期时,并且当前查询时间点是在当天6点之后的,只查询当天的实时数据,当前查询时间点发生在当天6点之前的,查询返回昨天和当天的实时数据;否则都不满足的情况下,做一次结果为空的查询动作,适应只查询历史数据的业务场景。
A3:调用”判断读取文件的范围.dfx”,传入脚本参数开始日期、结束日期的值,获得起止日期内的所有集文件的集合
A4:循环A3,分别打开每个集文件对象,根据文件创建游标,其中cursor()函数使用@b选项代表从集文件中读取。
B4:利用集算器提供的多路游标概念,把数据结构相同的多个游标合并成一个游标使用。使用时,多路游标采用并行计算来处理各个游标的数据,可以通过设置cs.mcursor(n) 函数中的n来决定并行数,当n空缺时,将按默认自动设置并行数
A9:最后返回结果集给报表工具使用
4.3热导出
所谓热导出,是相对于冷导出而言的。热导出要保证查询系统永不停机,在导出数据的过程中有查询请求进来,依然能够工作。热导出一般适用于实时查询场景要求较高的情况。
4.3.1实现思路
热导出需要利用文件的备份机制结合数据库的一致性来实现热切换动作。为了便于理解,可参考以下逻辑图:
首先,在数据库中建备份表,主要目的是为了记录当前正在使用的是哪个备份文件,以及从DB中取的热数据的日期范围,查询系统启动时把这个表清空。
其次,导出历史数据到集文件A,同时备份一个文件B,然后在数据库备份表中记录该文件A,以及设定从DB中取的热数据的日期(比如某个时刻之后);这个动作在系统初始化运行时,只做一次。
然后,设计数据查询的流程:
1、在数据库中建状态表,当数据查询时,先从备份表中查出可用的备份是哪个文件以及热数据的日期范围,然后加入一条记录到状态表中,表明该备份文件正有一个查询,当查询完成后将在状态表中把这条记录删除,可以用自增列的方式。
再次,设计数据导出到集文件的流程:
1、每天凌晨2点执行定时任务,先同步历史数据追加到文件B上,当导出完成后,修改数据库备份表的记录为使用文件B,同时修改从DB中取热数据的范围,以后新产生的查询动作都将使用文件B
2、检查并等待状态表中A的使用记录都已清空(基于A的所有查询都结束了),这时才会同步历史数据追加到文件A上,否则每等待1分钟就循环检查一次。
3、当步骤2的数据追加完成后,再修改数据库备份表为使用文件A,以后的新产生的查询又会回到了使用文件A,从而达到热切换的动作。
4、直到等待状态表中B的使用记录都清空(基于B的查询也都结束了)
5、整个过程执行完成,可以等待下一轮导出
这里需要特别说明的是,备份表、状态表必须用数据库作为媒介,从而利用数据库的一致性;不能用文件记录备份表、状态表的内容,因为文件无法保持一致性,当多任务并发时可能就乱了。
4.3.2数据查询
第一步,在数据库中定义”备份表”,包含三个字段(文件名称/边界时间/标识),同时定义”查询状态表”,包含三个字段(唯一标识/文件名称/当前系统时间,其中定义唯一标识为自增列),数据结构分别如下图示:
第二步,通过集文件A备份一个集文件B,然后在“备份表”中记录可查询的备份文件为A,并设定从DB中取的热数据边界时间(定义为每日的零点),此步操作如果用集算器脚本执行,样例代码如下:
A |
B |
|
1 |
=movefile@c(file("D:/PT/MES-A.btx"),"D:/PT/MES-B.btx") |
=connect("demo") |
2 |
>B1.execute("INSERT INTO backup (name,crashtime,flag) VALUES (?,?,?)","A",date(now()),"WORKING_STATUS") |
>B1.close() |
A1:根据导出的集文件A,复制备份同样的文件B
A2:备份文件B完成后,往数据表中写入当前可用的集文件A,当前系统时间(零点),给定标识列为:WORKING_STATUS
第三步,我们需要对前面章节中“混合运算场景”数据查询的脚本做一些改造,改造后的脚本如下(此例中也支持仅查历史数据的情况):
A |
B |
|
1 |
=connect("demo") |
=A1.query("SELECT code,name FROM watch") |
2 |
=A1.query@1("SELECT NAME,crashtime FROM BACKUP WHERE flag='WORKING_STATUS'") |
=name=A2(1),crashtime=A2(2) |
3 |
=A1.cursor("SELECT code,nums FROM meta_resource WHERE "+if(Efiledate>=date(crashtime),"DATE_FORMAT(fildate,'%Y-%m-%d')>='"+string(date(crashtime))+"'","1=0")) |
>A1.execute("INSERT INTO status (name,time) VALUES (?,?)",name,now()),uniques =A1.query@1("SELECT @@identity") |
4 |
=file(concat("D:/PT/MES-",name,".btx")).cursor@b().select(filedate>=Bfiledate && filedate<Efiledate) |
=[A3,A4].conjx() |
5 |
=B4.groups(code:不良代码,code:不良名称;sum(nums):不良数量,sum(nums):不良累计数量,sum(0):不良累计比率) |
|
6 |
>A5.switch(不良名称,B1:code) |
=A5.sort(不良数量:-1) |
7 |
>B6.run(不良累计数量+=不良累计数量[-1]) |
=A5.sum(不良数量) |
8 |
>B6.run(不良累计比率=round((不良累计数量/B7)*100,2)) |
=B6.new(不良代码,不良名称.name:不良名称,不良数量,不良累计数量,不良累计比率) |
9 |
>A1.execute("DELETE FROM STATUS WHERE uniques=?",uniques) |
>A1.close() |
10 |
return B8 |
前面已经解释过的格子代码这里不再赘述。
A2:根据标识WORKING_STATUS作为条件,查询出来当前可用的集文件名称,以及热数据取值的边界日期时间
B2:定义变量name, crashtime并赋值,便于后面单元格计算引用。
B3:此单元格做了两步动作,首先,写入一条记录到状态表中,表明该当前备份文件正有一个查询,其中uniques为自增列;接着在插入记录后,通过执行【SELECT @@IDENTITY】获取上一条插入语句中生成的自增长字段的值,赋值给变量uniques,便于A9查询时引用。数据库中的效果如下图:
A9:当查询完成后,根据变量uniques的值作为条件,在状态表中把这条记录删除,效果如下图:
4.3.3同步数据与热切换
改造“冷导出”方案中数据导出脚本,每天凌晨2点定时执行,代码如下:
A |
B |
C |
|
1 |
=file("D:/PT/MES-B.btx") |
=connect("demo") |
=file("D:/PT/MES-A.btx") |
2 |
=B1.cursor("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) |
>A1.export@ab(A2) |
>B1.execute("UPDATE BACKUP SET NAME = ?,crashtime=? WHERE flag ='WORKING_STATUS'","B",date(now())) |
3 |
for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0 |
>sleep(60*1000) |
|
4 |
=B1.cursor("SELECT * FROM meta_resource WHERE DATE_FORMAT(fildate,'%Y-%m-%d')=?", after(date(now()),-1)) |
>C1.export@ab(A4) |
>B1.execute("UPDATE BACKUP SET NAME = ?,crashtime=? WHERE flag = 'WORKING_STATUS'","A",date(now())) |
5 |
for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='B'")>0 |
>sleep(60*1000) |
|
6 |
>B1.close() |
前面已经解释过的格子代码这里不再赘述。
C2:当历史数据同步追加到文件B上,修改数据库”备份表”的记录为使用文件B,同时修改从DB中取热数据的边界日期范围,执行结果如下图:
A3-B3:循环查询”状态表”中A的使用记录是否已清空,若发现还有基于A的查询没有结束,那就等待1分钟,然后接着循环,直到基于A的查询全部结束;
其中A3的数据库连接表达式需要特别说明一下:通常情况下,在B1单元格中已经定义了数据库连接,在A3中,可直接引用,写成:
for B1.query@1("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0
不过,有些数据库在默认情况下做成了一次连接只处理一个事务,这样会导致A3在循环的时候结果不会变化,总是按照第一次查询出来的结果为主,比如第一次查询返回是true,当数据库发生变化了,它还是返回true,为了保险期间,可以写成如下格式:
for connect("demo").query@1x("SELECT COUNT(*) FROM STATUS WHERE NAME='A'")>0
这个属于数据库配置的范畴,可以通过数据库的连接参数来控制,这里不再详解。
C4:当文件A的数据追加完成后,再修改数据库”备份表”的记录为使用文件A,以后新产生的查询就会再使用文件A,执行结果如下图:
A5-B5:循环查询”状态表”中B的使用记录是否已清空,若发现还有基于B的查询没有结束,那就等待1分钟,然后接着循环,直到基于B的查询全部结束, 此轮整个导出过程全部完成,然后等待下一轮导出
五 总结
实时报表(T+0)的场景下,数据的热导出是个有些复杂的话题,不过,利用集算器(集文件)的备份机制结合数据库的一致性就可以轻松应对这类难题了,其中主要用到了以下两个优势:
1、跨库混合计算
集算器作为独立的计算引擎,可以并行指挥各个数据库分别计算,收集结果后再进行一轮汇总运算,然后向前端提交或者落地,从而可以很简单的实现T+0全量查询报表。
同时,在集算器跨库混合计算模型下,也不要求数据库是否同构,历史数据可以选择存储在成本更低的开源数据库中,例如Oracle和MySQL的混搭集群。
2、高性价比、高性能的集文件
无需构建数仓,将历史数据外置存放到文件系统中,不仅便于管理,而且可以获得更高效的IO性能和计算能力,从而很好的解决了关系型数据库中由于数据量大而导致的性能瓶颈和存储成本。
👍