Python 如何调用 SPL 脚本

【摘要】
      作为 client 端的 python 通过服务端的 jdbc 接口调用 SPL 脚本,非常容易实现 python 程序对 SPL 脚本的集成。具体开发要求、使用详细情况,请前往乾学院:Python 如何调用 SPL 脚本!

集算器提供了 JDBC 接口,Python 可以通过该接口调用 SPL。结构图如下:

aaapng

Python 调用 SPL 脚本是通过 py4j 接口 socket 方式访问 esProcJDBC Server,调用 dfx 脚本访问数据库或数据源获取数据。
esProcJDBC Server 封装了 JDBC 作为服务端,在运行 python 程序前,需要先启动服务端的 jvm。简单来说就是,将启动 JAVA 应用程序时加载集算器所需的 jar 包及配置文件放到项目中。需要注意的是,集算器 JDBC 所要求的 JDK 版本不得低于 1.8。

下面我们以 win10 系统、集算器安装在 D:\Program Files\raqsoft 路径 (后面简称[集算器目录]) 及集算器自带 java 为例来说明。

1. 加载驱动 jar

集算器 JDBC 是个完全嵌入式计算引擎,使用时需要 esProc JDBC 驱动、raqsoftConfig.xml, 主要依赖 jar 包如下:
esproc-bin-20210811.jar                   // 集算器计算引擎及 JDBC 驱动包
commons-math3-3.6.1.jar                  // 数据统计分析
py4j-0.10.9.2.jar                           // 与 java 通信包
esproc-py4j-server.2.10.jar            //esproc jdbc 服务端接口包 (此包放在外部库)

这里将上述 jar 包放在 d:/app/lib 下来说明,当然也可以放在其它目录或修改脚本加载的 jar。

启动 esProcJDBC Server 服务脚本start_jdbc_server.bat:

@echo off
rem START_HOME 为集算器安装目录,Server 运行时所依赖的集算器环境变量
set START_HOME="D:\Program Files\raqsoft"
set JAVA_HOME="D:\Program Files\raqsoft\common"
set EXECJAVA="D:\Program Files\raqsoft\common\jre\bin\java"
cd d:\app
rem parameter ip port path
rem example: start_server.bat localhost 25333 d:\tmp\data
start "dm" %EXECJAVA% -Xms128m -Xmx8520m -cp .; -Djava.ext.dirs=./lib; %START_HOME%\common\jdbc; -Dstart.home=%START_HOME%\esProc com.esproc.jdbc.EsprocJdbcServer %1 %2 %3

脚本设置了 java 运行环境变量,其中 START_HOME 为集算器安装的目录。脚本除了加载集算器目录下所需要的 jar 包外,还启用了它的环境配置等,如调用外部库,寻找 dfx 脚本文件等。脚本启动时缺省参数 ip=127.0.0.1,port=25333, 也可以更改 ip、port 参数。

部署和启动集算服务器的详细过程还可以参考Java如何远程调用 SPL脚本

2.    部署 raqsoftConfig.xml

集算器的运行环境配置文件 raqsoftConfig.xml 包含了集算器主路径、dfx 文件寻址路径 (本文为 demo 文件夹)、jdbc 数据源等各类信息, 存放在 [集算器目录]\esProc\config 下。start_jdbc_server.bat 脚本中设置 -Dstart.home=%START_HOME%\esProc,java 启动时会自动加载它,以便在执行 SPL 脚本时,在目录 demo 下查找对应的 dfx 文件或数据文件。

raqsoftConfig.xml 的具体信息也可再参考Java如何远程调用 SPL脚本

3.Python 调用

启动esProcJDBC Server后,Python 就可以调用 SPL 脚本了。Python 程序中调用 SPL 脚本,与 Python 中调用 SQL 和存储过程类似,将查询的数据以表结构形式 (具体为拆分成表头与数据两部分) 返回给 python。下面来看看具体是怎样实现的。

A 、执行 SPL 语句
如创建一个字段为 AA,BB,CC 的数据表,插入 3 条记录后返回其结果集。
SPL 脚本的代码为:

=create(AA,BB,CC).record(to(9))
Python 实现脚本如下:
import pandas as pd
from py4j.java_gateway import JavaGateway, GatewayClient

#缺省方式连接 server, 参数传递时python 集合转换为 java 集合

gateway = JavaGateway(auto_convert=True)
# 连接 server 时也可指定 ip,port 连接

#gateway = JavaGateway(GatewayClient(address="192.168.0.10",port=25333), auto_convert=True)

# 获取连接对象
conn = gateway.entry_point.getApp()

#程序主体处理部分
dfx="=create(AA,BB,CC).record(to(9))"
#执行脚本关返回结果集
result = conn.query(dfx)
# table header
cols = list(conn.getColumns())
# table data
rows = []
for lines in list(result):
rows.append(tuple(lines))
 
#数据存入 dataframe
df = pd.DataFrame(data=rows, columns=cols)
print(df)
conn.close()

输出结果:
  
   AA  BB  CC
0   1   2   3
1   4   5   6
2   7   8   9

由于采用 py4j 作为通信模块,用户并不需要过多关注收发数据等细节。conn.getColumns()返回的数据是JavaArray类型,需要用list()(tuple())转换数组(或元组)。有关pythonjava通信中的数据类型转换可参考相关的py4j文档,本jdbc接口返回为Table结构,转换采用list()tuple即可。

B 、在 SPL 中访问本地文件
如目录 demo 下有文本文件 aaa.txt,其内容为:
pid  age name     work
101 11   Tom        techer
102 12   Jack        manager
103 12   Joan       driver
104 13   Billy        doctor
105 15   Carl        driver
Python 实现部分脚本如下:
……
#程序主体处理部分
dfx= "select * from aaa.txt"
result = conn.query(dfx)
……
 
输出结果:
   pid  age  name   work
0  101   11  Tom    techer
1  102   12  Jack    manager
2  103   12  Joan    driver
3  104   13  Billy      doctor
4  105   15  Carl      driver

查询文本文件与查询 SQL 表数据类似,将返回的数据结果集通过 pandas 格式化输出。

C、带参数的 SPL 语句
参数是 SQL 语句的一个重要组成部分,同样,SPL 语句中也支持参数的使用。如上例查询 aaa.txt 文件中的部分数据,要求查询 age 大于 12 的记录。
Python 实现的部分代码如下:
……
#程序主体处理部分
dfx=“select * from aaa.txt where age>?”
result = conn.query(dfx,[12])
cols = list(conn.getColumns())
……
输出结果:
       pid  age   name    work
0  104   13   Billy  doctor
1  105   15   Carl   driver
参数以数组方式传递给 java 端,数组大小与 SPL 要求的参数个数保持一致,切记JavaGateway()需要指明参数 auto_convert=True。
 
D、有数据源的 SPL 语句

如访问数据库 mysql 下的 employee 数据表,将表中的数据作为结果集返回。启动 java 时需要加载 mysql jdbc 驱动程序。
mytest.dfx 脚本如下:


A B
1 =connect("mysql") //连接 mysql 数据库
2 =A1.query@x("select * from   employee") //查询 employee 表数据
3 return A2 //返回数据
其中 jdbc 数据源 mysql 在 IDE 中的配置如下:

bbbpng


Python实现的部分代码如下:
……
#程序主体处理部分
dfx="call mytest"
result = conn.query(dfx)
cols = list(conn.getColumns())
……
输出结果:
   EID  NAME  SURNAME GENDER  ...    BIRTHDAY    HIREDATE     DEPT SALARY
0    1  Rebecca  Moore     F  ...  1974-11-20  2005-03-11    R&D     7000
1    2  Ashley    Wilson     F  ...  1980-07-19  2008-03-16    Finance  11000
2    3  Rachel    Johnson    F  ...  1970-12-17  2010-12-01    Sales    9000
3    4  Emily     Smith      F  ...  1985-03-07  2006-08-15    HR      7000
4    5  Ashley    Smith      F  ...  1975-05-13  2004-07-30    R&D     16000
……
E、支持返回嵌套结果集:
市场中应用 Nosql 数据库非常多,其查询数据结果集中常带嵌套结构。现以外部库MongoDB为例说明, 在调用前需要设置 MongoDB 外部库为可用状态:

cccpng

MongoTest.dfx


A B
1 =mongo_open("mongodb://127.0.0.1:27017/raqdb") //连接 mongodb 数据库
2 =mongo_shell(A1,"storage.find()").fetch() //查询 storage 表数据
3 >mongo_close(A1) //关闭连接
4 return A2 //返回数据


A2
返回结果:

dddpng

Python 实现的部分代码如下:
……
#程序主体处理部分
dfx=“call MongoTest”
result = conn.query(dfx)
cols = list(conn.getColumns())
……

输出结果:

   _id           name           items
0  1000  Storage Alpha           category  name
                                              food    apple
                                              food    banana
                                              tool     h…
 
1  1001   Storage Beta                  category  name
                                              food    pear
                                              food    peach
                                              food    gra…
 
通过调用 SPL 脚本查询数据表,将其返回结果集存放到 pandas 格式化输出。嵌套结构支持序表或序列,对于更复杂的结构,也可在 java 端自定义接口进行数据转换。
 
F、游标查询:
相对于 query() 查询,游标查询分为执行 SPL 脚本与循环获取数据处理。我们用前面的 mysql 为数据源的 mytest.dfx 脚本来说明游标查询。
curs = gateway.entry_point.getApp()
dfx="call mytest"
#执行 SPL 脚本
curs.cursor(dfx)
cols = list(curs.getColumns())
 
# fetch data
rows = []
while(curs.hasNext()):
    result = list(curs.fetch(5))
    for line in result:
        rows.append(list(line))
 
df = pd.DataFrame(data=rows, columns=cols)
print(df)
……
游标查询时先执行 SPL 语句curs.cursor(dfx),再多次循环curs.fetch()获取数据。获取数据接口fetch(size)省略参数时size=1000
 
G返回多个结果集
Python从集算器获取返回的多个结果集列表时,每个结果集由表头与数据组成。

multiTable.dfx


A B
1 =create(AO,BO,CO).record(to(6)) //构造表 A1
2 =create(AS,BS).record(to(6,11)) //构造表 A2
3 =create(AT,BT,CT).record(to(20,25)) //构造表 A3
4 return A1,A2,A3 //同时返回多个表


dfx=“call multiTable”
#执行 SPL 脚本, 返回结果集列表
results = conn.mquery(dfx)
 
for result in results:
    rows = []
    cols = tuple(result[0])            # header
    for lines in tuple(result[1]):    # data
        rows.append(tuple(lines))
       
    df = pd.DataFrame(data=rows, columns=cols)
    print(df)
app.close()

查询 SPL 返回多结果集时,使用 mquery() 接口,通过遍历结果集列表,将每个结果集存放到 pandas 格式化输出。
python client调用esProcJdbc模块不同于Jaydebeapi调用 jdbc,每次运行 python 程序不需要启动 jvm,避免了运行效率不高问题。 同时,对开发用户来说,简明的开发流程,使用方便,让 Python 集成 SPL 也变得非常容易。
 
上述内容就是 Python 调用 SPL 脚本的常用方式了,其他程序调用 SPL 的用法:
Java 如何调用 SPL 脚本
Java 如何远程调用 SPL 脚本
Birt 调用 SPL 脚本
JasperReport 调用 SPL 脚本
HTTP 调用 SPL

想了解更多集算器应用集成用法的小伙伴儿可以去官网上的在线教程中查看