esproc vs python 6

本节在数据量比较大的情况下,对比esprocpython

数据量:7000多条万记录5个字段分别是orderidclientidselleridamountdate。总大小超过3G

1. 筛选8月份的交易记录

esproc


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tc()

3

=A2.select(month(date)==8).fetch()

4

=interval@ms(A1,now())

A2f.cursor()

根据文件f创建游标并返回,数据扫描完将自动关闭游标。@t, f中第一行记录作为字段名,不使用本选项时默认使用_1_2,…作为字段名. @c, s时用逗号分隔。如果同时有s则用s分隔。

A3:筛选出8月份的订单记录并取出结果

esproc并行代码:


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tmc(;16)

3

=A2.select(month(date)==8).fetch()

4

=interval@ms(A1,now())

A2:cursor@m(;n)@m选项,返回成多路游标,n表示路数。这时结果可能改变原来数据的顺序(筛选数据大多数情况下也不需要保持原序)

python

import time

import pandas as pd

import numpy as np

s = time.time()

chunksize=1000000

order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',iterator=True,chunksize=chunksize)

i = 0

month_8_list = []

for chunk in order_data:

    chunk['date'] = pd.to_datetime(chunk['date'])

    chunk_month_8 = chunk[chunk['date'].dt.month==8]

    month_8_list.append(chunk_month_8)

month_8 = pd.concat(month_8_list,ignore_index=True)

print(month_8)

e = time.time()

print(e-s)

定义chunksize大小为1000000万条记录。

pd.read_csv(fileorbuf,iterator,chunksize) iterator,返回一个TextFileReader 对象,以便逐块处理文件,chunksize文件块大小。

循环读取文件,每次都取chunksize的大小,筛选出8月份的记录,放入初始化的list中。

合并list中的dataframe得到结果。

pandas本身不支持并行,所以这里没有python的并行测试。

结果:

esproc单线程

esproc并行:

python


并行数

耗时

esproc

不并行

121.068

esproc

2

82.117

esproc

4

68.897

python

不并行

85.726

 

2. 计算出每个销售员每年的销售额和订单数

esproc


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tc(sellerid,date,amount)

3

=A2.groups(sellerid,year(date):y;sum(amount):amount,count(~):count)

4

=interval@ms(A1,now())

A2:按照sellerid和年份进行分组,同时汇总amountcount数。只取出计算需要的字段。

esproc并行代码


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tmc(sellerid,date,amount;4)

3

=A2.groups(sellerid,year(date):y;sum(amount):amount,count(~):count)

4

=interval@ms(A1,now())

python

import time

import pandas as pd

import numpy as np

s = time.time()

chunksize=1000000

order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',usecols=['sellerid','date','amount'],iterator=True,chunksize=chunksize)

chunk_g_list = []

for chunk in order_data:

    chunk['date'] = pd.to_datetime(chunk['date'])

    chunk['y'] = chunk['date'].dt.year

    chunk_g = chunk.groupby(by=['sellerid','y']).agg(['sum','count']).reset_index()

    chunk_g_list.append(chunk_g)

order_group = pd.concat(chunk_g_list,ignore_index=True).groupby(by=['sellerid','y'],as_index=False).agg({('amount','sum'):['sum'],('amount','count'):['sum']})   

order_group.columns = ['sellerid','y','amount','count']

print(order_group)

e = time.time()

print(e-s)

定义chunksize100

pd.read_csv(fileorbuf,usecols,iterator,chunksize)usecols是取出需要的字段。

按照chunksize循环读取数据

转换date字段的格式为pandasdatetime格式

新增一列年份y

df.groupby(by),按照selleridy进行分组。df.agg(),对分组数据进行多种计算。这里是对分组数据amount进行sumcount计算。

将结果放入list

合并list中的df,然后再按照selleridy分组同时计算amountsum值,(amount,count)sum值。

结果:

esprocesproc并行

python


并行数

耗时

esproc

不并行

113.417

esproc

2

77.001

esproc

4

54.078

python

不并行

83.724

 

3. 列出客户名单

esproc


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tc(clientid)

3

=A2.id(clientid)

4

=interval@ms(A1,now())

A2:游标读取clientid字段

A3cs.id(x)获取游标cs中字段x的不同值形成的序列。

python

import time

import pandas as pd

import numpy as np

s = time.time()

chunksize=1000000

order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',usecols = ['clientid'],iterator=True,chunksize=chunksize)

client_set = set()

for chunk in order_data:

    client_set = client_set|set(chunk['clientid'].values)

print(pd.Series(list(client_set)))

e = time.time()

print(e-s)

定义chunksize100

pd.read_csv(),usecols参数是读取的字段,这里只读取clientid字段。

定义一个集合

按照chunksize循环数据,取chunkclientid字段的值组成集合并与原来的集合求并集,最终的集合即为客户的名单.

为了便于查看将其转成series结构

结果:

esproc

python


耗时

esproc

48.582

python

57.797

 

4. 找到每个销售员销售额最大的3笔订单

esproc


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tc()

3

=A2.groups(sellerid;top(3;   -amount):top3)

4

=A3.conj(top3)

5

=interval@ms(A1,now())

A3:按照sellerid分组并取amount最大的前三条记录。A.top(n;x)存在x时,返回的是记录。针对序列的每个成员计算表达式x,返回前n个最小值对应的记录。

A4:连接top3字段的序表组成新序表。

esproc并行代码


A

1

=now()

2

=file("E:\\orders_big_data\\orders.csv").cursor@tmc(;4)

3

=A2.groups(sellerid;top(3;   -amount):top3)

4

=A3.conj(top3)

5

=interval@ms(A1,now())

python

import time

import pandas as pd

import numpy as np

s = time.time()

chunksize=1000000

order_data = pd.read_csv('E:\\orders_big_data\\orders.csv',iterator=True,chunksize=chunksize)

group_list = []

for chunk in order_data:

    for_inter_list = []

    top_n = chunk.groupby(by='sellerid',as_index=False)

    for index,group in top_n:

        group = group.sort_values(by='amount',ascending=False).iloc[:3]

        for_inter_list.append(group)

    for_inter_df = pd.concat(for_inter_list,ignore_index=True)   

    group_list.append(for_inter_df)

top_n_gr = pd.concat(group_list,ignore_index=True).groupby(by='sellerid',as_index=False)

top_n_list=[]

for index,group in top_n_gr:

    group = group.sort_values(by='amount',ascending=False).iloc[:3]

    top_n_list.append(group)

top_3 = pd.concat(top_n_list)

print(top_3)

e = time.time()

print(e-s)

定义一个list,用来存放每个chunk生成的df

循环数据

定义一个循环内的list,用来存放分组以后的df

按照sellerid分组

循环分组,按照amount排序,ascending=Falese表示倒序排序,取前三个,然后将结果放入for循环内的list

合并循环内listdf

循环结束后,合并最初定义的list中的df

再次按照sellerid分组

循环分组,按照amount排序,ascending=Falese表示倒序排序,取前三个

合并这次得到结果,得到每个销售员销售额最大的 3 笔订单

结果:

esproc

python


并行数

耗时

esproc

不并行

129.989

esproc

2

85.624

esproc

4

72.040

python

不并行

224.048

 

小结:本节我们用比较大的数据进行了简单的计算,包括条件查询、分组汇总、获得唯一值和topn。从代码的复杂度和运行速度看,esproc都占据了优势,esproc可以轻松的通过并行提高运行效率,充分发挥多核cpu的优势,而python则很难做到。第四例中,python进行了多次循环、排序、合并。我尝试了使用python原生库做第4例的题目,由于一直都是对比的pandas所以这里没有重点介绍,运行效率比pandas(耗时:183.164),但仍没有esproc快,这里仅供大家参考。python可以根据内存的大小调节chunksize的大小,在内存允许的情况下chunksize越大,运行效率越高。

 

第四例,python的另一种代码

s = time.time()

seller_dic = {}

with open('E:\\orders_big_data\\orders2.csv') as fd:

    i=0

    for line in fd:

        if i ==0:

            cols = line.strip().split(',')

            i+=1

        else:

            ss = line.strip().split(',')

            if len(ss) != 5:

                continue

            orderid = ss[0]

            clientid = ss[1]

            sellerid = int(ss[2])

            amount = float(ss[3])

            date = ss[4]           

            if sellerid not in seller_dic:

                seller_dic[sellerid]={}

                seller_dic[sellerid][amount] = ss

            else:

                if len(seller_dic[sellerid])<3:

                    seller_dic[sellerid][amount] = ss

                else:

                    if amount>min(seller_dic[sellerid].keys()):

                        seller_dic[sellerid].pop(min(seller_dic[sellerid].keys()))

                        seller_dic[sellerid][amount]=ss                   

seller_list = sorted(seller_dic.items(),key=lambda x:x[0])

top_3_list = []

for item in seller_list:

    for j in sorted(item[1].items(),key=lambda x:x[0],reverse=True):

        top_3_list.append(j[1])

top_3_df = pd.DataFrame(top_3_list,columns=cols)       

print(top_3_df)

e = time.time()

print(e-s)