esProc SPL 分组运算史上最强,没有之一

分组是常见的结构化数据计算,SQL 和 Python 都有相应的语句和函数来处理。不过,和 esProc SPL 提供的分组运算相比,这些语言都弱得多了。

常规的分组一般都会伴随着聚合,比如要计算各部门的员工人数,用 SQL 写出来是这样的:

select DEPT,count(1) emp_count from employee group by DEPT

这种基本的分组聚合运算,SPL 当然也会提供:

T("employee.csv").groups(DEPT;count(1):emp_count)

和 SQL 差不多。

除此之外,SPL 还有更丰富的分组运算,下面列举两种情况:

1. 分组子集

有时候我们不仅关心分组聚合值,还关心每组包含的明细数据,也就是分组子集。

比如,要查出人数大于 10 的部门有哪些员工,很简单的思路是按部门把全部员工拆成若干子集合,形成一个集合的集合,再从中过滤出长度大于 10 的分组子集,最后把这些子集合并,得到需要的结果。

这就不能在分组后强制聚合了,要保持分组子集。

SPL 支持分组子集:

T("employee.csv").group(DEPT).select(~.len()>10).conj()

group 函数按照部门拆分分组子集,select 过滤出长度大于 10 的子集,conj 合并这些子集。

SQL 分组总是和聚合绑定,没有分组子集的概念,这个问题的最简单解法要遍历两次:

select *
from employee
where DEPT in
    (select DEPT
     from employee
     group by DEPT
     having count(1)>10)
order by DEPT

先用子查询分组找出人数大于 10 的部门,再在外层查询中重新过滤出员工明细数据。

Python 本质上也没有分组子集的对象, groupby 返回结果并不是分组子集构成的集合,不过当前这个问题只要做一步过滤,还可以借助 lambda 语法较简洁地解决:

import pandas as pd
employee = pd.read_csv('employee.csv')
groups = employee.groupby('DEPT').filter(lambda x: len(x) > 10)
result = groups.sort_values(by='DEPT')

看起来和 SPL 的运算逻辑是一样的,相当于实现了分组子集概念,这里 lambda 里的 x 和 SPL 中的 ~ 有同样意义。但不同的是,filter 返回的不是集合的集合,而是自动拼成了单层集合,所以不像 SPL 那样还需要用 conj 做合并。

这样,filter 的结果就不再是分组子集的集合,而无法继续参与分组子集相关的运算了。

我们来进一步针对分组子集做更多操作:找出人数大于 10 的部门后,将这些部门按照其员工平均薪酬从大到小排列,部门内员工也按薪酬从大到小排列。

这就需要对分组子集构成的集合连续操作,先过滤后排序,再对每个子集排序。

SPL 可以很自然的接着刚才的代码继续实现这个过程:

=T("employee.csv").group(DEPT).select(~.len()>10).sort(-~.avg(SALARY)).(~.sort(-SALARY)).conj()

没有分组子集的 SQL 还是要遍历两次,而且还要配合 join:

select * from employee
join (
    select DEPT,avg(SALARY) as avg_salary,
    from employee
    group by DEPT
    having count(1) > 10
) dept_stats on employee.DEPT = dept_stats.DEPT
order by 
    dept_stats.avg_salary desc,
SALARY desc

子查询计算平均薪酬,内外层查询要改成 join 连接,代码有点绕。

Python 的分组子集机制此时也无能为力了,分组后再做的操作都会把结果变成单层表,如果要继续针对分组子集计算,就要再次分组以重新生成分组子集:

import pandas as pd
employee = pd.read_csv('employee.csv')
groups = employee.groupby('DEPT').filter(lambda x: len(x) > 10)
dept_avg_salary = groups.groupby('DEPT')['SALARY'].mean().reset_index(name='avg_salary')
merged_df = pd.merge(groups, dept_avg_salary, on='DEPT') 
sorted_df = merged_df.sort_values(by=['avg_salary', 'SALARY'], ascending=[False, False])
final_result = sorted_df[employee.columns]

第一步 filter 返回了单层表,之后要再次分组来计算聚合值。后面的代码还要延用前面 SQL 中的部分思路,用 merge 或者 join 将聚合值拼上才能实现排序的目标。

Python 并没有真正的分组子集机制,借助 Lambda 语法可以在一步运算中实现分组子集的效果,从而实现较复杂的聚合运算,这当然比 SQL 有优势得多,但和 SPL 相比还是差很远。

对于这个问题,Python 一定要只分组一次的话,也可以全部用前面 SQL 的思路,但仍会遍历两次,而且代码更繁琐,这里就不写了。

用到分组子集的计算非常常见,在著名的问答网站上很容易找出相关问题,比如:

https://stackoverflow.com/questions/78324132/how-do-i-perform-recursive-search-in-oracle

https://stackoverflow.com/questions/78346354/adding-tuple-rows-to-each-subtuple-group-in-sql

https://stackoverflow.com/questions/78456365/column-comparison-between-rows-of-the-same-group-used-in-partition-by-clause

2. 有序分组

分组子集常常会和有序分组相关。

比如求某支股票最长连涨天数,除了直接遍历的计算方法之外,还可以用分组来解决:将股票收盘价按日期排序后遍历数据,当某天上涨了,就把这一天和前一天分到一个组中,某天下跌了,则产生一个新组。遍历完成后,连续上涨的日期被分到了同一组,接下来只要考查哪一组成员数最多就可以了。

但是,这个分组并不是常规的等值分组,而是在遍历过程中按照条件变化有序分组。

SPL 就支持这种根据条件变化的有序分组:

stock.sort(Date).group@i(Price<=Price[-1]).max(~.len())

group 加上 @i 选项,会在表达式 Price<=Price[-1] 为真时,也就是股票没有涨价的情况下,产生新的分组。SPL 支持跨行引用,Price[-1] 是上一行的收盘价。

SQL 没有这种运算,就会麻烦很多:

SELECT MAX(ContinuousDays)
FROM(
    SELECT COUNT(*) AS ContinuousDays
    FROM (
        SELECT SUM(RisingFlag) OVER (ORDER BY Date) AS NoRisingDays
        FROM(
            SELECT Date, 
                    CASE WHEN Price > LAG(Price) OVER (ORDER BY Date) THEN 0
                    ELSE 1 END AS RisingFlag
            FROM stock
			)
         )
    )
GROUP BY NoRisingDays

由于没有有序分组,SQL 只能借助“累计未上涨天数”转换成普通的等值分组,整个过程非常绕。判断上涨和计算累计值都需要窗口函数配合子查询才能实现,结果就是两个窗口函数且嵌套四层的情况,代码难写难懂。

Python 也没有直接的有序分组,要像 SQL 一样衍生出一列累积值再转换成等值分组,思路同样绕,只是 Python 有更强的相邻计算,代码会比 SQL 简洁一些:

stock = pd.read_csv('stock.csv')
stock.sort_values(by=['Date'], inplace=True)
stock['NoRisingDays'] = stock['Price'].diff() > 0
grouped=stock.groupby((~stock['NoRisingDays']).cumsum())['NoRisingDays'].cumsum().where(stock['NoRisingDays'], 0)
max_increase_days = grouped.max()

涉及有序分组的实际问题也有很多,比如:

https://stackoverflow.com/questions/78442803/list-of-all-the-last-people-to-enter-the-elevator

https://stackoverflow.com/questions/78232342/sql-query-to-track-production-operations-outcome-progression-with-conditional-nu

https://stackoverflow.com/questions/64099063/oracle-sql-data-migration-row-to-column-based-in-month

分组子集和有序分组还有可能出现嵌套。

比如我们想统计每支股票的最长连涨天数,可以先按照股票代码分组,保持每个股票的分组子集,然后在每个子集内采用上述有序分组的办法计算最长连涨天数。

SPL 有真正意义上的分组子集概念,也支持有序分组,轻松实现两者的嵌套计算:

stock.sort(Date).group(Code;~.group@i(Price<=Price[-1]).max(~.len()):max_increase_days)

简单把前面两种用法组合起来就可以了。

SQL 则要在窗口函数中增加股票代码用来做分区字段:

SELECT Code, MAX(ContinuousDays)
FROM(
    SELECT Code, COUNT(*) AS ContinuousDays
    FROM(
        SELECT Code, Date, SUM(RisingFlag) OVER (PARTITION BY Code ORDER BY Code, Date) AS NoRisingDays
        FROM(
            SELECT Code, Date, 
                    CASE WHEN Price > LAG(Price) OVER (PARTITION BY Code ORDER BY Code, Date)  THEN 0
                    ELSE 1 END AS RisingFlag
            FROM stock
        	)
        )
    GROUP BY Code, NoRisingDays
    )
GROUP BY Code  

前面代码理解后,这样修改倒不是很难,但代码更繁琐了。

Python 既无法实现分组子集的连续操作,也不支持有序分组,只能用前面的办法把有序分组转换成等值分组,再加上多次重复分组实现:

import pandas as pd
stock = pd.read_csv('stock.csv')
stock.sort_values(by=['Code', 'Date'], inplace=True)
stock['NoRisingDays']=stock.groupby('Code')['Price'].diff().fillna(0).le(0).astype(int).cumsum()
grouped=stock.groupby(['Code','NoRisingDays']).size().reset_index(name='ContinuousDays')
max_increase_days = grouped.groupby('Code')['ContinuousDays'].max()
max_rise_df = max_increase_days.reset_index(name='max_increase_days')

代码很绕而且繁琐。

分组子集嵌套有序分组的实际问题也有不少:

https://stackoverflow.com/questions/78319976/postgresql-how-to-calculate-swipe-in-and-swipe-out-time

https://stackoverflow.com/questions/64116840/sql-formatting-to-user-friendly-date

https://stackoverflow.com/questions/78422651/identify-groups-of-sequential-records

SPL 还有对位分组,枚举分组,序号分组等更多业界独特的分组方法,远比 SQL 和 Python 丰富的多,是名副其实的史上最强。