SPL:访问 ORC、Parquet

Parquet 和 ORC 都是 hadoop 中用于存储数据的列式存储格式, hadoop 提供了 Java API 读写它们的文件,集算器把实现读取文件的 JAVA 代码做了封装,形成更易用的 SPL 函数。通过这些函数,直接把 Parquet、ORC 文件数据加载到集算器中进行计算。本文详细介绍这些函数的用法。

创建 / 关闭 HIVE 连接

使用方式类似数据库连接,SPL 也用成对的“创建 / 关闭”方式连接 HIVE。

hive_open(hdfsUrl,thriftUrl,dbName,hdfsUserName),参数 hdfsUrl 是 HDFS 服务器的地址,thriftUrl 是 Hive 元数据的 URI,dbName 为 hive 下的数据库名称,hdfsUserName 是 hdfs 用户名。
hive_close(conn),conn 是要被关闭的 HIVE 连接。

示例:A1 创建连接,中间做一些读写、计算操作后,A3 关闭 A1 创建的连接

A B
1 =hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive")
2 ……
3 >hive_close(A1)

获取数据库列表

hive_db(conn), 获取 hive 下的数据库列表

A B
1 =hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive")
2 =hive_db(A1)
3 >hive_close(A1)

返回数据:

dbName
1 default
2 mydb

获取表与文件位置列表

hive_table(conn, [dbname]), 在指定数据库中,获取表名及其对应 hdfs 文件位置的列表,无参数 dbname 时使用默认数据库;无选项时列出所有的表,选项 @o 列出 orc 类型表,选项 @p 列出 parquet 类型表。

A B
1 =hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive")
2 =hive_table(A1) /列出默认数据库下所有的表
3 =hive_table@o(A1) /列出默认数据库下 orc 表
4 =hive_table@p(A1) /列出默认数据库下 parqet 表
5
6 =hive_table(A1, "mydb") /列出 mydb 数据库下所有的表
7 =hive_table@o(A1, "mydb") /列出 mydb 数据库下 orc 表  
8 =hive_table@p(A1, "mydb") /列出 mydb 数据库下 parqet 表
9 >hive_close(A1)

其中 A3 返回数据:

tableName location
1 tstage hdfs://master:9000/user/hive/warehouse/tstage
2 inbound_orc hdfs://master:9000/user/hive/warehouse/inbound_orc
3 store_orc hdfs://master:9000/user/hive/warehouse/store_orc
4 orc_read hdfs://master:9000/user/hive/warehouse/orc_read
5 store_orc2 hdfs://master:9000/user/hive/warehouse/store_orc2

列出缺省数据库 default 下 ORC 表与对应的 HDFS 文件位置。

读取数据

f.hdfs_import@pc([col,…]),读取 orc、parquet 格式数据。参数 f:orc、parquet 格式文件, 本地或 hdfs 文件;col: 读出的字段,缺省为表中所有字段。选项 @p :读取parquet格式,缺省为读取orc格式;@c:游标方式读取,缺省为直接读取。

1、读取本地 ORC 数据

通过 file(fileName) 加载本地 orc 文件, 通过 f.hdfs_import() 读取数据。

A B
1 =file("H:/tmp/data/store.orc") /加载本地文件
2 =A1.hdfs_import() /全字段读取
3 =A1.hdfs_import("total","product") /过滤 total 等字段
4
5 =A1.hdfs_import@c() /游标方式读取
6 =A5.skip(100) /跳过 100 条数据
7 =A5.fetch(100) /获取 100 条数据

A2 返回数据:

Index store   product   total
1 store_id_000001 1 211
2 store_id_000001 3 1253
3 store_id_000001 4 458
4 store_id_000001 5 945

对于大数据 ORC 表,在 Hive 下当文件达到 HDFS 块大小的最大值时进行分割,每个文件有对应的记录数,使用 skip(N) 跳过 N 条数据,对记录不在的块则直接跳过该块,非常适合翻页显示等处理。

2、读取 HDFS ORC 数据

通过 file(fileName) 加载 hdfs orc 文件, 通过 f.hdfs_import() 读取数据。

A B
1 =file("hdfs://localhost:9000/user/86186/store.orc") /加载 hdfs 文件
2 =A1.hdfs_import() /全字段读取
3 =A1.hdfs_import("total","product") /过滤 total 等字段
4
5 =A1.hdfs_import@c() /游标方式读取
6 =A5.skip(100) /跳过 100 条数据
7 =A5.fetch(100) /获取 100 条数据

通过前面的 hive_table()或 hive shell 方式可获取 hive 表对应的 hdfs 文件位置,通过 f.hdfs_import() 函数加载文件读取数据。

 

3、通过 HIVE 表读取 HDFS 数据

结合 hive_table(),从 HIVE 表名中解析出 HDFS 文件地址,读取 hdfs 文件数据。

A B
1 =hive_open("hdfs://192.168.0.76:9000", "thrift://192.168.0.76:9083","default","hive") /连接 hive
2 =hive_table@o(A1) /获取 ORC 表及文件
3 >hive_close(A1) /关联 hive 连接
4 =A2.select(tableName=="nested_struct_t") /查找表 nested_struct_t
5 =file(A4.location) /加载 hdfs 文件
6 =A5.hdfs_import() /读取数据
7

A6 返回值:

100png

A4 通过 hive 表名从序表 A2 中查找表的记录,获取对应 hdfs 文件,读取数据。

4、使用复合结构

ORC、Parquet 格式支持 map, 嵌套、数组等复合数据结构类型,以便更好地处理结构化数据。下面结合 my_orc_nested 表,并给出相应的代码示例说明

-- 创建表

CREATE TABLE my_orc_nested (

  id INT,

  name STRING,

  address STRUCT<street:STRING, city:STRING, state:STRING>,

  phones MAP<STRING, STRING>,

  location ARRAY<STRING>

)STORED AS ORC;

 

-- 插入数据

INSERT INTO TABLE my_orc_nested VALUES

  (1, 'John', named_struct('street', '123 Main St', 'city', 'New York', 'state', 'NY'), map('home', '123-456-7890', 'work', '987-654-3210'), array('Charlie', 'David', 'Eve')),

  (2, 'Jane', named_struct('street', '456 Elm St', 'city', 'San Francisco', 'state', 'CA'), map('home', '111-222-3333'), array('Beijing', 'Herbin')),

  (3, 'master', named_struct('street','John Doe', 'city','Chongqing', 'state','BJ'), map('phone','2233445566'),array('Chongqing', 'Chengdu'));

Hive shell 使用 sql insert 脚本新增加数据时,对复合结构数据需要用对应的函数操作。named_struct()、map()、array() 函数操作分别对应嵌套结构、map 结构、数组结构。

A B
1 =file("hdfs://localhost:9000/user/86186/hive/my_orc_nested")
2 =A1.hdfs_import   ("name", "phones", "address") /过滤 name 等字段
3 =A1.hdfs_import   ()
4 =A3.(address).conj() /合并 address 数据
5 =A3.(phones).(~.fname().count()) /获取 phones 个数
6 =A3.(location).(~.count()) /获取 location 个数
7

A2 返回值:

101png

过滤字段,调整字段显示顺序。

A3 返回值:

102png

计算返回结果中,嵌套结构返回序表,map 结构返回单记录的序表,数组结构返回序列。
A4 返回值:

103png

合并嵌套结构 address 字段数据,可方便后面的计算。

A5 返回值:

104png

将 map 结构的 phones 记录通过键值计算个数。

A6 返回值:

105png

将数组结构的 location 记录计数。
对于复合数据结构的记录,读取 parquet 与 ORC 类似。根据需要参考存储的结构类型特点采用对应的方法处理,SPL 将嵌套结构的提升为扁平结构尤其便利、方便计算。

5、读取 Parquet 数据

利用 f.hdfs_import@pc([col1,…]) 读取 Parquet 数据与读取 orc 数据类似,其用法也相同,游标中的 skip() 处理也类似,在此不再细述了。前面讲述了复合结构结合序表的基本用法,这里讲述多层嵌套结构扁平化处理及数组取值。
hive shell 建表与添加数据脚本:
CREATE TABLE nested_struct_t (
  id INT,
  info struct<name:STRING, age:INT, contact:struct<email:STRING, phone:STRING>>,
  location ARRAY<STRING>
)
STORED AS PARQUET;

INSERT INTO TABLE nested_struct_t VALUES
    (1, named_struct('name','Kin dono', 'age',25, 'contact',named_struct('email', 'Kining@example.com', 'phone','1234567890')), array('Wuhan', 'Beijing', 'Xian')),
    (2, named_struct('name','Jane Smith', 'age',32, 'contact',named_struct('email','janesmith@example.com', 'phone','0987654321')), array('Beijing', 'Herbin')),
    (3, named_struct('name','John Doe', 'age',15, 'contact', named_struct('email','johndoe@example.com','phone','2233445566')),array('Chongqing', 'Chengdu'));

A B
1 =file("hdfs://localhost:9000/user/86186/nested_struct_test")
2 =A1.hdfs_import@p() /全表查询
3 =A1.hdfs_import@p("id",   "location", "location[0]", "location[2]") /数组字段过滤
4 =A1.hdfs_import@p("info",   "info.name", "info.contact",   "info.contact.email") /嵌套字段过滤
5

A2 返回值:

106png

点击 info 首行记录展开为:

107png

点击 contact 首行记录展开为:

108png

A3 返回值:

109png

对于数组查询可指定索引号,越界的值为空。

A4 返回值:

110png

对于多层嵌套结构查询,通过扁平化处理转换成行列数据。

SPL 函数对读取 Parquet、ORC 文件的封装,将本地文件与 hdfs 文件读取方式的统一,极大地简化了用户操作,提高了使用效率。