SPL:访问 Kafka

Kafka是一种分布式的信息流平台,它的客户端分为生产者、消费者两种,生产者提交数据,消费者读取数据。Kafka中传递的事件消息没有固定格式,消费者、生产者自由约定一些个性化格式,比如是JSONXML等结构的一个大字符串,SPL能很好的解析这些数据,后续也能做一些复杂计算。

创建/关闭Kafka连接

使用方式类似关系数据库的JDBC连接,SPL也用成对的创建/关闭方式连接Kafka

kafka_open(propertiesFile; topic1, topic2, ...) propertiesFile是连接参数topicKafka主题,可以多个,从多主题中同时读写数据

propertiesFile示例如下,更多参数可以参考Kafka官方文档

..

kafka_close(kafkaConn)kafkaConn是要关闭的Kafka连接。

示例:A1创建连接,中间做一些读写及计算操作后,A3关闭A1创建的连接


A

1

=kafka_open("/kafka/my.properties",   "topic1")

2

……

3

=kafka_close(A1)

生产者发送消息

kafka_send(kafkaConn, [key], value) kafkaConnKafka连接,key是消息的健,可以没有,value是消息内容。

示例:


A

1

=kafka_open("/kafka/my.properties",   "topic1")

2

{"fruit":"apple","weight":"35kg"}

3

{"fruit":"pear","weight":"48kg"}

4

=kafka_send(A1, "A100", A2)

5

=kafka_send(A1, "A101", A3)

6

=kafka_close(A1)

 

A2A3是两条待提交的JSON格式事件,A4,A5kafka_send()函数提交到Kafka平台。

 

消费者读取消息

kafka_poll(kafkaConnkafkaConnKafka连接,默认读取所有未读取的数据:


A

1

=kafka_open("/kafka/my.properties",   "topic1")

2

=kafka_poll(A1)

3

=A2.derive(json(value):v).new(key,   v.fruit, v.weight)

4

=kafka_close(A1)

 

A2读到的value是一个JSON字符串

..

 

A3json()函数解析JSON字符串,用derive()new()函数整理数据,形成规整的数据表:

..

 

如果数据量太大,可以加@c选项,这样就可以用游标方式分批加载数据。


A

1

=kafka_open("/kafka/my.properties",   "topic1")

2

=kafka_poll@c(A1)

3

=A3.fetch(1)

4

=A3.fetch(10)

5

=kafka_close(A1)

 

执行后,A2中看不到数据,它只是一个游标对象。A3取第1条数据:

..

 

A4取之后的10行数据(实际上一共只有2条数据,所以这次只取到最后1条):

..


以下是广告时间

对润乾产品感兴趣的小伙伴,一定要知道软件还能这样卖哟性价比还不过瘾? 欢迎加入好多乾计划。
这里可以低价购买软件产品,让已经亲民的价格更加便宜!
这里可以销售产品获取佣金,赚满钱包成为土豪不再是梦!
这里还可以推荐分享抢红包,每次都是好几块钱的巨款哟!
来吧,现在就加入,拿起手机扫码,开始乾包之旅



嗯,还不太了解好多乾?
猛戳这里
玩转好多乾