从 ES Kafka Mongodb Restful ... 取到 json 之后

json 是个好东西,它可以使用公共的文本形式承载了丰富的结构化数据的信息。现代很多技术都在喜欢使用 json 作为数据传输格式,比如 Elastic Search,Restful,Kafka 等,Mongodb 这类对性能较在意的技术则使用了二进制化的 json。

结构化的数据常常是批量的,也常常是需要再计算的。
但是,json 相关的类库却没什么方便用来做计算的。jsonpath 解析 json 没问题,却没什么计算能力,简单的过滤聚合还可以,稍复杂到分组汇总就不灵了,基本上是靠自己硬编码完成了。
写进数据库来算?这也太沉重了。何况,json 经常是多层的结构化数据,写进关系数据库要建几个关联的表,入库的成本远高于计算本身了。

esProc SPL 来帮你。
esProc SPL 是纯 Java 开发的开源计算引擎,在这里 https://github.com/SPLWare/esProc

esProc SPL 对 json 库进行了封装,一句话就可以把 json 文本解析成可计算的 SPL 序表(SPL 的内存结构化数据对象):


A

1

=file("d:\\xml\\emp_orders.json").read()

2

=json(A1)

SPL 序表天然多层结构,即字段取值可以是另一个序表,这和 json 天然契合:
8a12b10be5_1png

一旦转换成 SPL 序表之后,计算本身就是 esProc 的强项了。过滤、分组、连接都不在话下,大部分计算目标都可以一句话完成:

Filter:T.select(Amount>1000 && Amount<=3000 && like(Client,"*s*"))
Sort:T.sort(Client,-Amount)
Distinct:T.id(Client)
Group:T.groups(year(OrderDate);sum(Amount))
Join:join(T1:O,SellerId; T2:E,EId)
TopN:T.top(-3;Amount)
TopN in group:T.groups(Client;top(3,Amount))

这些内容很多,这里就不展开了,感兴趣的小伙伴可以到 esProc SPL 的官网去参考相关资料。

esProc SPL 已经封装了很多常见的 json 数据源的访问接口。
Restful:纯文本式的 json,计算完了还可以反向生成 json 文本


A

1

=httpfile("http://127.0.0.1:6868/restful/emp_orders").read()

2

=json(A1)

3

=A2.conj(Orders).select(Amount>1000 && Amount<=2000 && like@c(Client,"*business*"))

4

=json(A3)

Elastic Search:可以直接在 SPL 代码中写 json 常数后参与传输和计算


A

1

>apikey="Authorization:ApiKey a2x6aEF……KZ29rT2hoQQ=="

2

'{

"counter" : 1,

"tags" : ["red"]

,"beginTime":"2022-01-03"

,"endTime":"2022-02-15"

}

3

=es_rest("https://localhost:9200/index1/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)

4

=json(A3.Content)

Mongodb:二进制化的 json 也没问题


A

1

=mongo_open("mongodb://127.0.0.1:27017/mymongo")

2

=mongo_shell(A1,"{'find':'orders',filter:{OrderID: {$gte: 50}},batchSize:100}")

3

=A2.cursor.firstBatch.select(Amount>1000 && Amount<=2000 && like@c(Client,"*business*"))

4

=mongo_close(A1)

Kafka:SPL 也封装了向这些数据源写出的接口,形成 IO 闭环


A

1

=kafka_open("/kafka/my.properties", "topic1")

2

=kafka_poll(A1)

3

=A2.derive(json(value):v).new(key, v.fruit, v.weight)

4

=kafka_send(A1, "A100", json(A3))

5

=kafka_close(A1)

对于 Mongodb,Kafka 这类可能返回大数据量的数据源,esProc SPL 还提供游标对象和方法,可以逐步读取,边读边处理。这里就不详细举例了,小伙伴也可以去官网查阅资料。

通常 json 数据不会单独存在,还会和其它数据源交换数据以及混合计算。esProc SPL 当然也不只专门为了对付 json 而发明的,它是专业的计算引擎,能支持的数据源非常丰富:

clipboardpng

这些数据源都有被 SPL 读成序表和游标,再实现混合计算以及交换数据就非常容易了。

那么,esProc SPL 写出来的代码如何集成到应用程序中呢?
很简单,esProc 提供了标准的 JDBC 驱动,被 Java 程序引入后,就可以使用 SPL 语句了,和调用数据库 SQL 一样。

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
Statement statement = conn.createStatement();
ResultSet result = statement.executeQuery("=json(file(\"Orders.csv\")).select(Amount>1000 && like(Client,\"*s*\")

较复杂的 SPL 脚本可以存成文件,然后就像调用存储过程一样:

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("call queryOrders()");
statement.execute();

作为纯 Java 开发的软件,esProc SPL 可以完全无缝地集成进 Java 应用中,就和应用程序员自己写的代码一样,一起享受成熟 Java 框架的优势。SPL 本身有完善的流程控制语句,像 for 循环,if 分支都不在话下,还支持子程序调用。只用 SPL 就能实现非常复杂的业务逻辑,直接构成完整的业务单元,不需要上层 Java 代码来配合,主程序只要简单地调用 SPL 脚本就可以了。

将 SPL 脚本存储成文件,置于主应用程序之外,代码修改可以独立进行且立即生效,不像 Java 代码在修改代码后还要重新编译,整个应用都要停机重启。这样可以做到业务逻辑的热切换,特别适合支持变化频繁的业务,而这也是 json 广泛应用的地方。

clipboardpng