SPL 报表开发:轻量级低成本实时热数据报表方案

实时热数据报表,是指能够实时查询全量冷热数据的报表。早期业务只基于单个 TP 数据库时,这种报表并不是什么问题。但数据量大了,要拆分到专门的 AP 数据库后,就不一样了。因为冷热数据分离后,再做全量数据的实时报表,就会涉及多数据库的混合计算,而且 AP 库和 TP 库常常还是不同类型的,就更增加麻烦。

采用 HTAP 数据库融合 TP 与 AP 后,可以实现成早期单一数据库的架构,避免跨库混合运算。然而,迁移到新的 HTAP 数据库风险高、成本大,还不一定能复制原有 TP 和 AP 库的各种能力。

采用 Trino 等逻辑数仓引擎或者 Flink 等流处理引擎,再配合 kafka 传输数据,可以实现一定程度的实时同步以及多数据源混合计算。但是架构非常复杂沉重,会涉及元数据管理等诸多繁琐任务,涉及流计算的代码也不够简洁,同步实时程度较高时消耗的资源也较高,这会导致建设和应用成本都很高。

SPL 方案

..

SPL 通过原生数据源接口与 TP、AP 源连接,两边的数据分别各自处理后传输给 SPL 混合再计算,为报表提供实时计算结果。

SPL 有天然的多源混算能力,无论是 AP 库还是 Kafka 等流机制,无论是何种形式的数据仓库还是大数据平台,亦或者文件或云上对象存储,SPL 都可以直接访问读取并混合计算。

更详细地多源计算内容请参考: SPL 报表开发:不依赖逻辑数仓的轻量级多数据源报表

SPL 非常轻量,原来架构几乎不用动,既不涉及数据库迁移,也无需面临复杂的架构改动,只需将 SPL 集成(无缝)到报表应用中即可享受实时查询的便利。

而且,SPL 是临时从 TP 读取热数据再混合计算,这种结构对数据转储进 AP 库的实时性要求并不高,可以从容不迫地将数据做一些整理后再同步进 AP 库,以获得 AP 库更好的计算性能。

话不多说,我们来看一下 SPL 实现 T+0 计算的示例。

举个例

1. 业务场景

某电商平台希望实时监控用户的购买行为,并生成 实时商品销售转化率报表,以辅助营销策略调整。报表需展示以下指标:

  • 每小时内各商品的浏览量、购买量及转化率(购买量 / 浏览量)。

  • 每种商品实时的总销售额。

2. 报表目标

生成一张实时更新的报表,内容示例如下:

商品 ID 浏览量 购买量 转化率(%) 总销售额(元)
P001 500 100 20 15,000
P002 300 50 16.67 7,500

浏览量购买量:通过整合 实时用户点击流数据 和 实时订单数据获取。

总销售额:从 实时订单数据 和 历史订单数据 计算。

转化率:通过实时计算购买量与浏览量的比值获得。

3. 数据来源

TP 数据库(订单数据):存储最新的订单数据,支持事务性操作,实时写入。

Kafka 流数据:记录实时的用户行为数据(如用户浏览商品、加入购物车、完成结算等行为)。Hadoop 存储(历史订单数据):存储历史订单数据,采用 ORC 格式存储

(1)TP 数据库 - orders 表(存储最新的订单数据)

order_id user_id product_id quantity total_price order_time
1001 2001 3001 2 400.00 2024-12-19 10:15:00
1002 2002 3002 1 150.00 2024-12-19 10:20:00
1003 2001 3003 3 600.00 2024-12-19 10:25:00

(2)Kafka 流数据 - 用户行为数据(过去一小时)

product_id views
P001 500
P002 300

(3)Hadoop 存储 - his_orders 表(历史订单数据)

order_id user_id product_id quantity total_price order_time
1001 2001 3001 2 400.00 2023-12-01 12:10:00
1002 2002 3002 1 150.00 2023-12-01 14:00:00
1003 2001 3003 3 600.00 2023-12-01 16:30:00

4. 实现脚本

编写 SPL 脚本 realTimeOrder.splx(历史脚本从 Hive 中读取):


A
1 =connect("mysqlDB")
2 =A1.query@x("SELECT product_id, SUM(quantity) AS total_sales, SUM(total_price) AS total_revenue FROM orders WHERE order_time > now()-1h GROUP BY product_id") /实时订单数据
3 =kafka_open("/kafka/order.properties", "topic_order")
4 =kafka_poll(A3).value /实时用户行为数据
5 =kafka_close(A3)
6 =hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive") /连接 hive
7 =hive_table@o(A6) /获取 ORC 表及文件
8 >hive_close(A6)
9 =A7.select(tableName=="his_orders")
10 =file(A9.location).hdfs_import@c() /历史订单数据
11 =A10.select(interval(order_time,now())<=730)
12 =A11.groups(product_id;SUM(total_price):historical_revenue)
13 =join(A2:o,product_id;A4:v,product_id;A12:ho,product_id) /关联三部分数据
14 =A13.new(o.product_id:product_id,v.views:views,o.total_sales/v.views:conversion_rate,o.total_revenue+ho.historical_revenue:total_revenue)
15 return A14

历史数据可以也可以从 HDFS 中直接读取,脚本如下:


A
1 =connect("mysqlDB") /TP
2 =A1.query@x("SELECT product_id, SUM(quantity) AS total_sales, SUM(total_price) AS total_revenue FROM orders WHERE order_time > now()-1h GROUP BY product_id") /实时订单数据
3 =kafka_open("/kafka/order.properties", "topic_order") /Kafka
4 =kafka_poll(A3).value /实时用户行为数据
5 =kafka_close(A3)
6 =file("hdfs://localhost:9000/user/86186/orders.orc") /直接访问 HDFS
7 =A6.hdfs_import@c() /历史订单数据
8 =A7.select(interval(order_time,now())<=730)
9 =A8.groups(product_id;SUM(total_price):historical_revenue)
10 =join(A2:o,product_id;A4:v,product_id;A9:ho,product_id) /关联三部分数据
11 =A10.new(o.product_id:product_id,v.views:views,o.total_sales/v.views:conversion_rate,o.total_revenue+ho.historical_revenue:total_revenue)
12 return A11

整体脚本非常简单清晰。

5. 报表集成

SPL 与报表应用集成非常简单,只要将 SPL 的 esproc-bin-xxxx.jar 和 icu4j-60.3.jar 两个 jar 包引入到应用中(一般在 [安装目录]\esProc\lib 下),然后复制 raqsoftConfig.xml(也在上述路径下)到应用的类路径下即可。

raqsoftConfig.xml 是 SPL 的核心配置文件,名称不可更改。

在 raqsoftConfig.xml 中,需要配置外部库,这里用到的 Kafka:

<importLibs>
    <lib>KafkaCli</lib>
</importLibs>

同时 MySQL 数据源也需要配置在这里,在 DB 节点下配置连接信息:

<DB name="mysqlDB">
    <property name="url" value="jdbc:mysql://192.168.2.105:3306/raqdb?useCursorFetch=true"/>
    <property name="driver" value="com.mysql.cj.jdbc.Driver"/>
    <property name="type" value="10"/>
    <property name="user" value="root"/>
    <property name="password" value="root"/>
    …
</DB>

SPL 封装了标准 JDBC 接口,报表可以通过配置 JDBC 数据源来访问 SPL。 比如,在报表数据源处进行如下配置:

<Context>
    <Resource name="jdbc/esproc"
              auth="Container"
              type="javax.sql.DataSource"
              maxTotal="100"
              maxIdle="30"
              maxWaitMillis="10000"
              username=""
              password=" "
              driverClassName=" com.esproc.jdbc.InternalDriver "
              url=" jdbc:esproc:local:// "/>
</Context>

标准 JDBC,与普通数据库无异,这里不赘述。

然后在报表数据集中,使用访问存储过程的方式调用 SPL 脚本即可获得 SPL 的计算结果。调用计算实时订单的脚本 realTimeOrder.splx:

call realTimeOrder()

即可为报表输出计算后结果集。