spl 集算器如何实现生产者消费者模型
问题描述:
现在有一个 3000 万数据文件需要导入数据库,目前做法是一次读取 160 万数据到内存,将 160 万数据根据序号分组分为 32 组,每组 5 万条。然后用 fork 使用 32 个线程去并行插入数据库。但是经过测试发现现在整个流程卡点是在读取 160 万数据这一步,因为整个流程是串行的。先加载 160 万数据,然后分批交给 32 个线程去插入。目前想法是希望实现生产者消费者模式,让读取文件和插入数据库并行执行。
准备两个序列,读取线程先去读取 160 万数据到序列 1,读取完成后,再由 32 个插入线程去批量插入。并且在插入的同时读取线程去接着读取 160 万数据到序列 2。这样就可以做到读取和插入操作并行。可是实在想不到这块逻辑用 spl 怎么去实现,求各位大佬援助

看描述是不想等每次读文件(160 万)后再写库,而是有两个 buffer(序列),buffer1 写库的同时读文件到 buffer2,然后轮换。
这样的话,可以试试 fork [读脚本, 写脚本] 来实现。
对,就是希望这么做老师。因为我理解在插入数据库的时候其实主要是网络 io 等待,cpu 是空闲的,希望能把系统资源尽可能压榨一下,提升下整体插入性能。老师麻烦有空了可以给我一段 demo 代码吗,您发的 fork 逻辑我没太看懂
fork [“read.splx”,“write.splx”] 里面 eval 动态 call 试试呢?
奥我懂你意思了,你是要我传入两个脚本名称,fork 开启两个线程动态 eval 去调用两个脚本吗,可是我记得 fork 里面是只能引用上面定义序列,没办法修改,我记得官网描述说的是 fork 里面没办法修改主线程定义的值
我明天试试,我肯定是需要定义多个变量去控制这两个序列的置换以及文件有没有读取完成。可是我在 fork 线程里面能修改主线程的变量值吗,我记得官网描述说是不可以的,我明天试试吧
可以用 env 搞全局变量,大佬们已经把 SPL 做的很顶啦😄
好的感谢大佬回复😄 ,我明天就去试试
感谢大佬,已经按照你说的方法成功实现这个逻辑了😄 就是我对全局变量还是有点疑问,我在子线程去修改全局变量的值是不是可以直接通过变量名修改而不是通过 env,我本地试了一下,好像是可以
是。不客气
env 在赋值时会在计算值前加锁,保证一致性。直接赋值不会,多线程并发有可能出乱子,赋值后又被另一个线程改掉了
嗯嗯了解了谢谢大佬, 还想咨询个问题, 就是我在使用 f.cursor() 的时候加了参数 @q, 因为文本数据中字符串是被引号包裹的, 可是我发现我就算读取的时候指定了字段类型为 string, 读出来数据还是会被引号包裹, 于是我发现有参数 @q 可以去除包裹字符串的引号, 可是我加上以后又遇到新问题, 就是存在字符串 "哈哈 \123" 这种里面包含转义符的字符串, 可是加了 @q 以后会被解析成 "哈哈 123",@q 选项好像把他当转义符给转义了, 怎么解决这个问题呀
转义总要处理的。不想处理,就自己读出来带引号的然后 run 一下把两边引号去掉,但很可能出错。\ 是 Java 的转义标准,可以用 @o 换成 Excel 的转义规则
好的大佬,我明天去试试。现在出现这个问题原因是因为这是上游数据卸数规范,这些数据并不是只有我们这个系统在用,有各种语言的系统,所以他们不会为了考虑 java 或者 Python 这种 \ 是转义符的系统去专门做处理而是保持原样卸数。spl 以后会考虑指定转义符功能吗,我理解读取文件时本身 java 也不会去转义,spl 读取的时候也确实没有转义,是用了 @q 以后才去转义了,我使用@q选项的想法只是想要去掉包裹字符串的引号,并不是让他去按照 java 规范去转义。感觉 spl 有点过度处理了,并不需要他干预字符串里面的内容。
用 replace 函数成功实现了