SPL 报表开发:轻量级低成本实时热数据报表方案

实时热数据报表,是指能够实时查询全量冷热数据的报表。早期业务只基于单个 TP 数据库时,这种报表并不是什么问题。但数据量大了,要拆分到专门的 AP 数据库后,就不一样了。因为冷热数据分离后,再做全量数据的实时报表,就会涉及多数据库的混合计算,而且 AP 库和 TP 库常常还是不同类型的,就更增加麻烦。

采用 HTAP 数据库融合 TP 与 AP 后,可以实现成早期单一数据库的架构,避免跨库混合运算。然而,迁移到新的 HTAP 数据库风险高、成本大,还不一定能复制原有 TP 和 AP 库的各种能力。

采用 Trino 等逻辑数仓引擎或者 Flink 等流处理引擎,再配合 kafka 传输数据,可以实现一定程度的实时同步以及多数据源混合计算。但是架构非常复杂沉重,会涉及元数据管理等诸多繁琐任务,涉及流计算的代码也不够简洁,同步实时程度较高时消耗的资源也较高,这会导致建设和应用成本都很高。

SPL 方案

..

SPL 通过原生数据源接口与 TP、AP 源连接,两边的数据分别各自处理后传输给 SPL 混合再计算,为报表提供实时计算结果。

SPL 有天然的多源混算能力,无论是 AP 库还是 Kafka 等流机制,无论是何种形式的数据仓库还是大数据平台,亦或者文件或云上对象存储,SPL 都可以直接访问读取并混合计算。

更详细地多源计算内容请参考: SPL 报表开发:不依赖逻辑数仓的轻量级多数据源报表

SPL 非常轻量,原来架构几乎不用动,既不涉及数据库迁移,也无需面临复杂的架构改动,只需将 SPL 集成(无缝)到报表应用中即可享受实时查询的便利。

而且,SPL 是临时从 TP 读取热数据再混合计算,这种结构对数据转储进 AP 库的实时性要求并不高,可以从容不迫地将数据做一些整理后再同步进 AP 库,以获得 AP 库更好的计算性能。

话不多说,我们来看一下 SPL 实现 T+0 计算的示例。

举个例

1. 业务场景

某电商平台希望实时监控用户的购买行为,并生成 实时商品销售转化率报表,以辅助营销策略调整。报表需展示以下指标:

  • 每小时内各商品的浏览量、购买量及转化率(购买量 / 浏览量)。

  • 每种商品实时的总销售额。

2. 报表目标

生成一张实时更新的报表,内容示例如下:

商品 ID

浏览量

购买量

转化率(%)

总销售额(元)

P001

500

100

20

15,000

P002

300

50

16.67

7,500

浏览量购买量:通过整合 实时用户点击流数据 和 实时订单数据获取。

总销售额:从 实时订单数据 和 历史订单数据 计算。

转化率:通过实时计算购买量与浏览量的比值获得。

3. 数据来源

TP 数据库(订单数据):存储最新的订单数据,支持事务性操作,实时写入。

Kafka 流数据:记录实时的用户行为数据(如用户浏览商品、加入购物车、完成结算等行为)。Hadoop 存储(历史订单数据):存储历史订单数据,采用 ORC 格式存储

(1)TP 数据库 - orders 表(存储最新的订单数据)

order_id

user_id

product_id

quantity

total_price

order_time

1001

2001

3001

2

400.00

2024-12-19 10:15:00

1002

2002

3002

1

150.00

2024-12-19 10:20:00

1003

2001

3003

3

600.00

2024-12-19 10:25:00

(2)Kafka 流数据 - 用户行为数据(过去一小时)

product_id

views

P001

500

P002

300

(3)Hadoop 存储 - his_orders 表(历史订单数据)

order_id

user_id

product_id

quantity

total_price

order_time

1001

2001

3001

2

400.00

2023-12-01 12:10:00

1002

2002

3002

1

150.00

2023-12-01 14:00:00

1003

2001

3003

3

600.00

2023-12-01 16:30:00

4. 实现脚本

编写 SPL 脚本 realTimeOrder.splx(历史脚本从 Hive 中读取):


A


1

=connect("mysqlDB")


2

=A1.query@x("SELECT product_id, SUM(quantity) AS total_sales, SUM(total_price) AS total_revenue FROM orders WHERE order_time > now()-1h GROUP BY product_id")

/实时订单数据

3

=kafka_open("/kafka/order.properties", "topic_order")


4

=kafka_poll(A3).value

/实时用户行为数据

5

=kafka_close(A3)


6

=hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive")

/连接 hive

7

=hive_table@o(A6)

/获取 ORC 表及文件

8

>hive_close(A6)


9

=A7.select(tableName=="his_orders")


10

=file(A9.location).hdfs_import@c()

/历史订单数据

11

=A10.select(interval(order_time,now())<=730)


12

=A11.groups(product_id;SUM(total_price):historical_revenue)


13

=join(A2:o,product_id;A4:v,product_id;A12:ho,product_id)

/关联三部分数据

14

=A13.new(o.product_id:product_id,v.views:views,o.total_sales/v.views:conversion_rate,o.total_revenue+ho.historical_revenue:total_revenue)


15

return A14


历史数据可以也可以从 HDFS 中直接读取,脚本如下:


A


1

=connect("mysqlDB")

/TP

2

=A1.query@x("SELECT product_id, SUM(quantity) AS total_sales, SUM(total_price) AS total_revenue FROM orders WHERE order_time > now()-1h GROUP BY product_id")

/实时订单数据

3

=kafka_open("/kafka/order.properties", "topic_order")

/Kafka

4

=kafka_poll(A3).value

/实时用户行为数据

5

=kafka_close(A3)


6

=file("hdfs://localhost:9000/user/86186/orders.orc")

/直接访问 HDFS

7

=A6.hdfs_import@c()

/历史订单数据

8

=A7.select(interval(order_time,now())<=730)


9

=A8.groups(product_id;SUM(total_price):historical_revenue)


10

=join(A2:o,product_id;A4:v,product_id;A9:ho,product_id)

/关联三部分数据

11

=A10.new(o.product_id:product_id,v.views:views,o.total_sales/v.views:conversion_rate,o.total_revenue+ho.historical_revenue:total_revenue)


12

return A11


整体脚本非常简单清晰。

5. 报表集成

SPL 与报表应用集成非常简单,只要将 SPL 的 esproc-bin-xxxx.jar 和 icu4j-60.3.jar 两个 jar 包引入到应用中(一般在 [安装目录]\esProc\lib 下),然后复制 raqsoftConfig.xml(也在上述路径下)到应用的类路径下即可。

raqsoftConfig.xml 是 SPL 的核心配置文件,名称不可更改。

在 raqsoftConfig.xml 中,需要配置外部库,这里用到的 Kafka:

<importLibs>
    <lib>KafkaCli</lib>
</importLibs>

同时 MySQL 数据源也需要配置在这里,在 DB 节点下配置连接信息:

<DB name="mysqlDB">
    <property name="url" value="jdbc:mysql://192.168.2.105:3306/raqdb?useCursorFetch=true"/>
    <property name="driver" value="com.mysql.cj.jdbc.Driver"/>
    <property name="type" value="10"/>
    <property name="user" value="root"/>
    <property name="password" value="root"/>
    …
</DB>

SPL 封装了标准 JDBC 接口,报表可以通过配置 JDBC 数据源来访问 SPL。 比如,在报表数据源处进行如下配置:

<Context>
    <Resource name="jdbc/esproc"
              auth="Container"
              type="javax.sql.DataSource"
              maxTotal="100"
              maxIdle="30"
              maxWaitMillis="10000"
              username=""
              password=" "
              driverClassName=" com.esproc.jdbc.InternalDriver "
              url=" jdbc:esproc:local:// "/>
</Context>

标准 JDBC,与普通数据库无异,这里不赘述。

然后在报表数据集中,使用访问存储过程的方式调用 SPL 脚本即可获得 SPL 的计算结果。调用计算实时订单的脚本 realTimeOrder.splx:

call realTimeOrder()

即可为报表输出计算后结果集。