SPL:访问 Elasticsearch

Elasticsearch 是一个分布式搜索服务器,它提供了Restful API更新数据、搜索数据。提交的数据、搜索结果都是JSON格式,计算引擎语言SPLjson()函数解析JSON字符串,转换成容易做数据计算的SPL序表,因此,也特意提供了es_rest()函数调用Elasticsearch API读写数据。

es_rest(url, method, content; httpHeader1, httpHeader2, …),第一个参数是链接地址,不同操作的url有不同的尾部格式,支持httphttps;第二个参数是HTTP method,值可能是GET/PUT/POST/DELETE;第三个参数是HTTP请求提交的内容,但也有些操作不提交任何内容,这个参数就可以省略。分号后面是多个HTTP header,是验证身份的API Key、指明内容格式等一些信息。每种RestHTTP请求,使用哪种method,提交什么内容,设置哪些HTTP header,这些细节参考其官网文档。

示例:


A

1

>apikey="Authorization:ApiKey a2x6aEF……KZ29rT2hoQQ=="

2

'{

"counter" : 1,

"tags" : ["red"]

,"beginTime":"2022-01-03"

,"endTime":"2022-02-15"

}

3

=es_rest("https://localhost:9200/index1/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)

4

=json(A3.Content)

5

'{

"counter" : 2,

"tags" : ["gray"]

,"beginTime":"2022-08-03"

,"endTime":"2022-12-15"

}

6

=es_rest("https://localhost:9200/index2/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)

7

'{

"script" : {

"source": "ctx._source.counter += params.count",

"lang": "painless",

"params" : {

"count" : 4

}

}

}

8

=es_rest("https://localhost:9200/index1/_update/1", "POST",A7;"Content-Type: application/x-ndjson",apikey)

9

'{

"docs": [

{

"_index": "index1"

,"_id":1

}

,{

"_index": "index2"

,"_id":1

}

]

}

10

=es_rest("https://localhost:9200/_mget", "GET",A9;apikey,"Content-Type: application/json")

11

=json(A10.Content)

12

'{

"query": {

"bool": {

"filter": {

"range": {

"beginTime": {"gte": "2022-01-01", "lte": "2022-07-01"}

}

}

,"boost" : 1.0

}

}

}

13

=es_rest("https://localhost:9200/_search","GET",A12;apikey,"Content-Type: application/json")

14

=json(A13.Content)

15

=es_rest("https://localhost:9200/index1/_doc/1", "DELETE";apikey)

16

=json(A15.Content)

A1中是API Key,之后访问Elasticsearch的操作都会用到它;

A2是一条待写入的新数据。

A3es_rest()函数执行写入,执行后,HTTP响应的信息都会列出来:

..

A3Content字段是执行结果JSON字符串,A4json()函数把其转换成SPL序表,方便查看:

..

A6再写入第二条数据。

A7定义一段更新数据的json,把index1counter的值从1增加到5A8执行更新。

A9定义一段多行查询json,观察A11中的结果,把嵌套结构的json字符串转换成了多层序表:

..

A12定义一段按时间搜索的json,查询beginTime2022年上半年的数据,观察A14中的结果序表:

..

A15执行删除index1的操作。