SPL:访问 Kafka
Kafka是一种分布式的信息流平台,它的客户端分为生产者、消费者两种,生产者提交数据,消费者读取数据。Kafka中传递的事件消息没有固定格式,消费者、生产者自由约定一些个性化格式,比如是JSON,XML等结构的一个大字符串,SPL能很好的解析这些数据,后续也能做一些复杂计算。
创建/关闭Kafka连接
使用方式类似关系数据库的JDBC连接,SPL也用成对的“创建/关闭”方式连接Kafka。
kafka_open(propertiesFile; topic1, topic2, ...), propertiesFile是连接参数,topic是Kafka主题,可以多个,从多主题中同时读写数据。
propertiesFile示例如下,更多参数可以参考Kafka官方文档
kafka_close(kafkaConn),kafkaConn是要关闭的Kafka连接。
示例:A1创建连接,中间做一些读写及计算操作后,A3关闭A1创建的连接
A |
|
1 |
=kafka_open("/kafka/my.properties", "topic1") |
2 |
…… |
3 |
=kafka_close(A1) |
生产者发送消息
kafka_send(kafkaConn, [key], value) ,kafkaConn是Kafka连接,key是消息的健,可以没有,value是消息内容。
示例:
A |
|
1 |
=kafka_open("/kafka/my.properties", "topic1") |
2 |
{"fruit":"apple","weight":"35kg"} |
3 |
{"fruit":"pear","weight":"48kg"} |
4 |
=kafka_send(A1, "A100", A2) |
5 |
=kafka_send(A1, "A101", A3) |
6 |
=kafka_close(A1) |
A2、A3是两条待提交的JSON格式事件,A4,A5用kafka_send()函数提交到Kafka平台。
消费者读取消息
kafka_poll(kafkaConn) ,kafkaConn是Kafka连接,默认读取所有未读取的数据:
A |
|
1 |
=kafka_open("/kafka/my.properties", "topic1") |
2 |
=kafka_poll(A1) |
3 |
=A2.derive(json(value):v).new(key, v.fruit, v.weight) |
4 |
=kafka_close(A1) |
A2读到的value是一个JSON字符串
A3用json()函数解析JSON字符串,用derive()、new()函数整理数据,形成规整的数据表:
如果数据量太大,可以加@c选项,这样就可以用游标方式分批加载数据。
A |
|
1 |
=kafka_open("/kafka/my.properties", "topic1") |
2 |
=kafka_poll@c(A1) |
3 |
=A3.fetch(1) |
4 |
=A3.fetch(10) |
5 |
=kafka_close(A1) |
执行后,A2中看不到数据,它只是一个游标对象。A3取第1条数据:
A4取之后的10行数据(实际上一共只有2条数据,所以这次只取到最后1条):
英文版