SPL 报表开发:不依赖逻辑数仓的轻量级多数据源报表

背景

报表经常会基于多种数据源,如 RDB、NoSQL、文本、Excel、MQ 等。

..

借助逻辑数据仓库可以一定程度上实现多源混算,但其架构往往过于复杂和沉重,需要繁琐的预处理过程以运维管理工作,更适合大型机构的平台级解决方案。对于一个普通报表应用来说就太沉重了,专门建设逻辑数仓有点得不偿失。

SPL 方案

SPL 提供了多数据源实时混合计算功能,无论何种数据源只要能访问到就都能混算,非常轻量,还能直接嵌入到报表应用工作。报表、SPL、数据源的关系如下:

..

SPL 的数据源连接器分为两种:原生连接器和外部连接器。最常见的 RDB,文本、Excel、JSON 等本地文件,以及 HTTP 数据源等都属于原生连接器,内置在 SPL 核心体系中。而其他多样性数据源,如 MongoDB、Kafka、ElasticSearch、云存储都属于外部连接器,不在 SPL 核心体系内,需要另外部署。

与逻辑数仓的映射方式不同,SPL 支持并提倡使用数据源的原生语法实现数据访问和计算,如果数据源本身的计算能力不足,则可以使用 SPL 来补充,在后面实例中会具体看到。

SPL 支持数据源种类很多,这里列出了 SPL 支持的部分数据源,基本常见的数据源都包含了。

..

其中,原生连接器支持的数据源种类包括:MySQL、Oracle 等 JDBC 数据源;CSV、Excel、JSON 文件等 Local File;HTTP、RestAPI 等 Web 数据源,以及远程文件等。

SPL 提供了两种数据对象(序表和游标)用于访问数据源,分别对应内存数据表和流式游标。

包括关系数据库在内,几乎所有的数据源都会提供这两种数据对象的接口:小数据一次性读出,使用内存数据表;大数据要逐步返回,使用流式游标。有了这两种数据对象,就可以覆盖几乎所有的数据源了。

和逻辑数仓不同,SPL 不需要事先定义元数据做映射,直接使用数据源本身提供的方法来访问数据,只要封装成这两种数据对象之一即可。

这样可以保留数据源的特点,充分利用其存储和计算能力。当然更不需要像物理数仓那样先把数据做“某种”入库动作,实时访问就可以。

接下来看一下具体使用。

SPL IDE 配置连接器配置数据源

对于原生连接器,在 IDE 内配置最常见的 JDBC 连接,这里使用 MySQL。复制数据库驱动 jar 到:[SPL 安装目录]/ common/jdbc 下,如图示配置 MySQL 标准 JDBC 连接。

..

使用外部连接器,需要下载外部库驱动包,具体地址请参照 SPL 论坛上的下载链接。将其解压至任意目录(缺省在 [SPL 安装目录]\esProc\extlib)加载外部库。

外部库列表包括以下数十种非常规数据源及函数:

..

外部连接器放在外部库中,外部库是 SPL 提供的外部函数扩展库,将一些在普遍场景使用频率不高的专门用途函数以外部库形式提交,这样可以根据需要临时加载。

外部数据源种类繁多,也不是每种数据源都很常用,所以将这些连接器以外部库的形式提供会更为灵活,以后发现有新的数据源也可以及时补充而不影响现有的数据源。

绝大多数外部库都是外部连接器。

在 IDE 的菜单 tool-Options,Environment 选项卡下选择刚刚下载解压的外部库目录。

..

这时所有外部库都会列出来,然后勾选需要加载的外部库。

..

这里选择了 MongoDB,勾选确定后需要重启 IDE 才能生效。

混算实例

Mongodb 与 MySQL 混合查询

我们的报表查询实例来源于电商业务,使用 MySQL 存储订单相关信息,涉及的两个表 orders 和 order_items 结构是这样的:

Orders:

字段

类型

说明

order_id

INT (PK)

订单标识

user_id

INT

用户 ID

order_date

DATETIME

订单日期

total_amount

DECIMAL(10, 2)

订单金额

Order_items:

字段

类型

说明

order_item_id

INT (PK)

商品项标识

order_id

INT(FK)

订单 ID

product_id

VARCHAR(20)

商品 ID

quantity

INT

购买数量

price

DECIMAL(10, 2)

单价

两个表通过 order_id 关联。

由于商品信息会根据类型动态变化,比如电子产品有品牌、型号和规格,而服装则包括品牌、尺寸和颜色,因此使用 MongoDB 来存储商品信息,其中 attributes 字段为动态属性。

字段

说明

product_id

商品 ID(与 MySQL 关联)

name

名称

brand

品牌

category

分类

attributes

动态属性

商品集合通过 product_id 与 MySQL 关联。

这是 MySQL 和 MongoDB 的示例数据。

Orders:

order_id

user_id

order_date

total_amount

1

101

2024-09-01

1999.99

2

102

2024-09-02

1599.99




Order_items:

order_item_id

order_id

product_id

quantity

price

1

1

prod_1001

1

999.99

2

1

prod_1002

1

1000.00

3

2

prod_1003

1

1599.99





MongoDB:

{
  "product_id": "prod_1001",
  "name": "Smartphone X",
  "brand": "BrandA",
  "category": "Electronics",
  "attributes": {
    "color": "Black",
    "storage": "128GB"
  }
},
{
  "product_id": "prod_1002",
  "name": "Laptop Y",
  "brand": "BrandB",
  "category": "Computers",
  "attributes": {
    "processor": "Intel i7",
    "ram": "16GB"
  }
…
}

基于订单销售统计报表,查询最近一个月(时间段),‘Tablets’, ‘Wearables’, ‘Audio’ (商品种类)这三类商品的销售总额,会涉及跨 MySQL 和 MongoDB 混合查询。

SPL 实现脚本(orderAmount.splx):


A

1

=connect("mysql")

2

=A1.query@x("SELECT o.order_id, o.user_id, o.order_date, oi.product_id, oi.quantity, oi.price

FROM orders o JOIN order_items oi ON o.order_id = oi.order_id

WHERE o.order_date >= CURDATE()- INTERVAL 1 MONTH")

3

=mongo_open("mongodb://127.0.0.1:27017/raqdb")

4

=mongo_shell@d(A3,

"{'find':'products',

'filter': {

'category': {'$in': ['Tablets', 'Wearables', 'Audio'] }

}}

)

5

=A2.join@i(product_id,A4:product_id,name,brand,category,attributes)

6

=A5.groups(category;sum(price*quantity):amount)

7

return A6

A1 连接 MySQL,并在 A2 通过 SQL 查询一个月内的订单数据,选项 @x 表示查询后关闭数据库连接。查询结果是这样的。

..

A3 连接 MongoDB。

A4 使用 mongo_shell 函数查询并过滤数据,@d 选项表示返回序表。函数参数是 MongoDB Command 标准的 JSON 字符串,也就是 MongoDB 的原生语法。可以看到符合条件的数据查出来了。

..

A5 将两部分通过 product_id 进行关联,这是关联结果。

..

A6 按产品种类汇总订单金额,算出最终结果。

..

A7 通过 return 为报表返回计算后结果集。

Kafka 与 MongoDB 混合查询

接下来再使用 kafka 来参与计算。

基于前面的场景,使用 kafka 进行订单信息和商品信息发布,用户下单时,订单数据将被发送到 order 主题。

生产者脚本示例:


A

1

=kafka_open("/mafia/my.properties","topic-order")

2

[

{

"order_id": "1",

"user_id": "101",

"product_id": "prod_1001",

"quantity": 1

},

]

3

=kafka_send(A1, "A101", json(A2))

4

=kafka_close(A1)

A1 连接 kafka 服务器,用到的属性文件 my.properties 内容是这样的:

bootstrap.servers=localhost:9092
client.id=SPLKafkaClient
session.timeout.ms=30000
request.timeout.ms=30000
enable.auto.commit=true
auto.commit.interval.ms=1000
group.id=spl-consumer-group
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

A2 是要发送的订单数据,正常需要接收来自程序传递,这里仅是示例就直接写好。

A3 发送订单数据,A4 关闭连接。

报表数据来源是消费数据的脚本 (orderConsumer.splx),我们要查询指定分类下的商品信息,首先要设置商品分类参数:

..

脚本如下:


A

1

=kafka_open("/mafia/my.properties","topic-order")

2

=kafka_poll(A1)

3

=json(A2.value)

4

=mongo_open("mongodb://127.0.0.1:27017/raqdb")

5

=mongo_shell@d(A4,"{'find':'products'}")

6

=A3.join(product_id,A5:product_id,product_id,name,brand,category,attributes)

7

return A6.select(category== arg_category)

A2 获取 consumer 消息记录,返回序表,结果是这样:

..

A3 将订单数据 JSON 串转换成序表。

..

A5 查询商品数据。

..

A6 根据 product_id 关联,得到订单及商品所有信息。

..

最后在 A7 根据条件筛选符合的数据并为报表返回查询的结果集。至此,kafka 与 MongoDB 的混合查询报表也完成了。

大数据支持

报表查询还可能涉及较大数据量,这时都全量读取会很慢,甚至可能导致内存溢出。SPL 提供了游标机制将数据以流的形式逐步读出。

我们改一下前面 MySQL 和 MongoDB 混算的例子:


A

1

=connect("mysql")

2

=A1.cursor@x("SELECT o.order_id, o.user_id, o.order_date, oi.product_id, oi.quantity, oi.price

FROM orders o JOIN order_items oi ON o.order_id = oi.order_id

WHERE o.order_date >= CURDATE() - INTERVAL 1 MONTH

ORDER BY oi.product_id ASC")

3

=mongo_open("mongodb://127.0.0.1:27017/raqdb")

4

=mongo_shell@dc(A3,"{'find':'products','filter': {},'sort': {'product_id': 1}}")

5

=joinx(A2:o,product_id;A4:p,product_id)

6

=A5.groups(p.category;sum(o.price*o.quantity):amount)

7

return A6

A2 将原来的 query 改成 cursor,即返回游标,结果是一个游标对象,此时数据还并未真正读取。

..

A4 使用 @c 选项,意味着查询的 MongoDB 数据也返回游标,结果同样是游标对象。

A5 使用 joinx 函数将两个游标进行关联,使用该函数要求两个游标对关联字段有序,所以在 A2 的 SQL 和 A4 的 MongoDB 命令中都进行了排序。

A5 的关联结果仍然是游标,如果使用 fetch 取数可以看到真实结果是一个多层序表。

..

A6 按照分类进行汇总,得到计算结果。直到此时所有游标才真正开始取数计算。

..

A7 为报表返回计算后结果。几乎所有数据源都可以使用 SPL 游标来应对大数据查询报表。

与报表应用结合使用

下面看一下如何在报表应用中使用 SPL。

应用集成

SPL 与报表应用集成非常简单,只要将 SPL 的 esproc-bin-xxxx.jar 和 icu4j-60.3.jar 两个 jar 包引入到应用中(一般在 [安装目录]\esProc\lib 下),然后复制 raqsoftConfig.xml(也在上述路径下)到应用的类路径下即可。

raqsoftConfig.xml 是 SPL 的核心配置文件,名称不可更改。

在 raqsoftConfig.xml 中,需要配置外部库,使用到哪个配置哪个即可。比如这里用到的 MongoDB 和 Kafka:

<importLibs>
    <lib>KafkaCli</lib>
    <lib>MongoCli</lib>
</importLibs>

同时 MySQL 数据源也需要配置在这里,在 DB 节点下配置连接信息:

<DB name="mysql">
    <property name="url" value="jdbc:mysql://127.0.0.1:3306/raqdb?useCursorFetch=true"/>
    <property name="driver" value="com.mysql.cj.jdbc.Driver"/>
    <property name="type" value="10"/>
    <property name="user" value="root"/>
    <property name="password" value="root"/>
    <property name="batchSize" value="0"/>
    <property name="autoConnect" value="false"/>
    <property name="useSchema" value="false"/>
    <property name="addTilde" value="false"/>
    <property name="caseSentence" value="false"/>
</DB>

报表访问 SPL

SPL 封装了标准 JDBC 接口,报表可以通过配置 JDBC 数据源来访问 SPL。 比如,在报表数据源处进行如下配置:

<Context>
    <Resource name="jdbc/esproc"
              auth="Container"
              type="javax.sql.DataSource"
              maxTotal="100"
              maxIdle="30"
              maxWaitMillis="10000"
              username=""
              password=" "
              driverClassName=" com.esproc.jdbc.InternalDriver "
              url=" jdbc:esproc:local:// "/>
</Context>

标准 JDBC,与普通数据库无异,这里不赘述。

然后在报表数据集中,使用访问存储过程的方式调用 SPL 脚本即可获得 SPL 的计算结果。调用前面计算订单汇总的脚本 orderAmount.splx:

call orderAmount()

调用带参数的订单查询脚本 orderConsumer.splx:

call orderConsumer(?)

其中,? 代表指定的商品类别参数,参数值由报表查询时指定。

这里我们仅演示了部分数据源间的混合计算,SPL 支持的数据源很多,而且还能扩展。

由于提供了统一数据对象,只要 SPL 能访问到,就都能混合计算,非常简单,这样报表就可以获得实时数据混合查询的能力,快速实现多源、T+0 报表。