用 SPL 编写 AWS lambda 函数

1. 前言

利用AWS Lambda函数,可以运行代码而无需预置或管理服务器,可以直接从任何Web或移动应用程序调用函数获得运算结果,是非常便利的函数服务。在函数代码中可以读取业务数据,进行复杂运算处理后向调用者输出结果。然而数据处理和计算是一个复杂的过程,工作量大,编写、调试将会耗费大量的时间。SPL是数据计算领域的优秀工具,它能连接多种数据库、各种格式的数据文件等数据源,提供非常丰富的数据计算函数,可以用很简洁的代码编写您需要进行的计算功能,并且提供方便的可视化分步调试功能。用SPL来编写lambda函数以处理和计算数据,将会大大减少函数代码的编写工作量,提高开发效率。

2. 函数 runSPL 编写

2.1 函数功能

runSPL函数用于启动SPL运行环境,运行编写好的SPL脚本,并将结果以json格式返回给函数调用端。

2.2 函数参数

runSPL接收2个参数splxparameterssplx指定编写好的脚本文件的位置,parameterssplx脚本所需要的参数,格式为json串。

2.3 源代码

[RunSpl.java]

package com.scudata.lib.shell;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.json.JSONObject;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.scudata.app.config.ConfigUtil;
import com.scudata.app.config.RaqsoftConfig;
import com.scudata.app.config.RemoteStoreConfig;
import com.scudata.cellset.datamodel.PgmCellSet;
import com.scudata.common.Logger;
import com.scudata.dm.Env;
import com.scudata.dm.FileObject;
import com.scudata.dm.JobSpaceManager;
import com.scudata.ecloud.ide.GMCloud;
import com.scudata.ecloud.util.InitialUtil;
import com.scudata.ide.spl.Esprocx;
import com.scudata.lib.ctx.ImS3File;
import com.scudata.lib.lambda.ImUtils;

public class RunSpl implements RequestHandler<Map<String, Object>, Object> {
	private static S3Client s3Client;
	private com.scudata.dm.Context splCtx;
	static {
		InputStream stream = null;
		try {
			Logger.info("spl start........");
			//读取部署包中的SPL配置文件
			String xml = "/opt/java/config/raqsoftConfig.xml";
			File f = new File(xml);
			if (f.exists()) {
				stream = new FileInputStream(f);
				RaqsoftConfig rc = ConfigUtil.load(stream, true);
				InitialUtil.loadAcloudFunctions();  //加载Qfile等函数
				GMCloud.initQJDBC( rc );   //加载配置文件中的远程存储设置
				List<RemoteStoreConfig> list = rc.getRemoteStoreList();
				if( list != null && list.size() > 0 ) {
					RemoteStoreConfig rsc = list.get( 0 );
					JSONObject jo = new JSONObject( rsc.getOption() );
					initS3(jo.getString("accessKey"), jo.getString("secretKey"), jo.getString("region"), jo.getString("endPoint"));
				}
			}
		} catch (Throwable e) {
			Logger.error("init SPL config error: " + e);
		} finally {
			try {
				stream.close();
			} catch (Throwable th) {}
		}
	}
	
	public Object handleRequest(Map<String, Object> event, Context context) {
		String ret = "";
		try {
			return doHandle(event, context);
		} 
		catch( Throwable e) {
			ret = e.getMessage();
			Logger.error(e.getMessage());
		}
		return ret;
	}
	
	//初始化访问S3的配置
	private static void initS3(String accessKey, String secretKey, String region, String endPoint) throws Exception {
		SdkHttpClient httpClient = ApacheHttpClient.builder().build();
		if (accessKey == null && secretKey == null) {
			ImUtils.getS3Instance(region);
			return;
		}
		AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey);
		StaticCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(awsCreds);
		S3ClientBuilder builder = S3Client.builder().httpClient(httpClient);
		if (region != null && !region.trim().isEmpty()) {
			Region reg = Region.of(region);
			builder.region(reg);
		}
		if (endPoint != null && !endPoint.trim().isEmpty()) {
			builder.endpointOverride(URI.create(endPoint));
		}
		s3Client = builder.credentialsProvider(credentialsProvider).build();
		ImUtils.setS3Client(s3Client);
	}

	//函数处理方法:event中接收splx和parameters两个参数,执行指定的脚本并返回运算结果
	private Object doHandle(Map<String, Object> event, Context context) throws Exception {
		String bucket = null;
		String objName = null;
		splCtx = Esprocx.prepareEnv();  //运算脚本的上下文环境
		Object o = event.get("parameters");
		parseParameterToCtx( o );
		boolean isS3File = false;
		String splx = "";
		o = event.get("splx");
		if (o != null && o instanceof String) {
			splx = o.toString();
			//splx以s3://开头的表示是存储在S3上的文件
			if( splx.toLowerCase().startsWith( "s3://" ) ) {
				String[] vs = ImUtils.splitS3String( splx );
				if (vs.length == 2) {
					bucket = vs[0];
					objName = vs[1];
				}
				isS3File = true;
			}
		}
		else throw new Exception( "Function paramter value of splx is invalid." );

		File splxFile = null;
		if ( isS3File ) {
			//从S3上下载脚本到/tmp/bucket/objName
			ImS3File s3File = new ImS3File(bucket + "/" + objName, "utf-8");
			splxFile = s3File.getLocalFile();
			System.out.println( "splxFile=" + splxFile);
			if (splxFile == null || !splxFile.exists()) {
				throw new Exception( bucket + "/" + objName + " is not existed" );
			}
		} 
		else {
			splxFile = new File( splx );
		}

		FileObject fo = new FileObject( splxFile.getAbsolutePath(), "p" );
		PgmCellSet pcs = fo.readPgmCellSet();
		try {
			//执行脚本
			pcs.setContext( splCtx );
			pcs.calculateResult();
		}
		finally {
			JobSpaceManager.closeSpace( splCtx.getJobSpace().getID() );
		}
		Object ret = pcs.nextResult();
		Logger.info("spl end ....");
		Object jsonVal = ImUtils.dataToJsonString(ret);
		return jsonVal;   //以json格式返回运算结果
	}
	
	//将参数串中的参数设置到上下文环境splCtx中
	private void parseParameterToCtx( Object param ) {
		if (param == null) return;
		if (param instanceof String) { // for jsonString
			String jsonVal = param.toString();
			JSONObject json = new JSONObject(jsonVal);
			Iterator<String> keys = json.keys();
			while( keys.hasNext() ) {
				String key = keys.next();
				splCtx.setParamValue( key, json.get( key ) );
			}
		}
	}
}

3. runSPL 部署

登录AWS帐户后,进入lambda服务页面。

3.1 创建层

lambda层(Layer)为函数提供可引用的资源,例如jar包或配置文件。runSPL创建4个层:

1、 myLayerLambdaS3:部署访问S3lambda API所需要jar文件,压缩成lambda_s3.zip文件部署包,jar文件放入zip文件的java/lib目录中,这些jar来源于esProc 外部包中的S3CliLambdaCli,可在SPL官网下载

2、 myLayerSpl:部署运行SPL所需要的jar文件,压缩成spl.zip文件部署包,jar文件放入zip文件的java/lib目录中,这些jar来自安装后的SPL企业版安装目录下lib中,除以下4个外,其余是SPL引用的第三方jar包:
esproc-bin-xxx.jar
esproc-ext-xxx.jar
esproc-ent-xxx.jar
ecloud-xxx.jar

其中xxx是日期,尽量获取最新日期的包。

3、 mySqlJdbc:部署连接Amazon RDS MySQL数据库的JDBC程序包,只有一个mysql-connector-j-8.3.0.jar文件,可从https://dev.mysql.com/downloads/connector/j/下载,放入rds-mysql.zip文件中java/lib目录。如果访问别的数据库,就用相应数据库的JDBC包。

4、 runsplConfig:部署SPL配置文件raqsoftConfig.xml以及其它相关文件。本示例中包括要用到的本地脚本文件orders.splx和本地组表数据文件orders.ctx,压缩成config.zip文件部署包,raqsoftConfig.xml文件放入java/config目录中,orders.splxorders.ctx放在根目录中。

raqsoftConfig.xml文件内容:

<?xml version="1.0" encoding="UTF-8"?>
<Config Version="3">
    <Runtime>
        <DBList encryptLevel="0">
            <DB name="rds_mysql">
                <property name="url" value="jdbc:mysql://mysql-free2.c3oygm4kw7lr.rds.cn-north-1.amazonaws.com.cn:3306/tpch?useCursorFetch=true"/>
                <property name="driver" value="com.mysql.jdbc.Driver"/>
                <property name="type" value="10"/>
                <property name="user" value="admin"/>
                <property name="password" value="esp…..p"/>
                <property name="batchSize" value="0"/>
                <property name="autoConnect" value="false"/>
                <property name="useSchema" value="false"/>
                <property name="addTilde" value="false"/>
                <property name="caseSentence" value="false"/>
            </DB>
        </DBList>
        <Esproc>
            <charSet>GBK</charSet>
            <splPathList>
                <splPath>/tmp</splPath>
            </splPathList>
            <dateFormat>yyyy-MM-dd</dateFormat>
            <timeFormat>HH🇲🇲ss</timeFormat>
            <dateTimeFormat>yyyy-MM-dd HH🇲🇲ss</dateTimeFormat>
            <mainPath>/tmp</mainPath>
            <tempPath/>
            <bufSize>65536</bufSize>
            <parallelNum>32</parallelNum>
            <cursorParallelNum>8</cursorParallelNum>
            <blockSize>1048576</blockSize>
            <nullStrings>nan,null,n/a</nullStrings>
            <fetchCount>9999</fetchCount>
            <extLibsPath></extLibsPath>
            <customFunctionFile/>
            <RemoteStores>
                <RemoteStore name="S3" type="S3" cachePath="/tmp" minFreeSpace="0" blockBufferSize="0">
               {"dataKey":"","endPoint":"https://s3.cn-north-1.amazonaws.com.cn", "secretKey":"aYI3JBZOuRGk…….vNhjDhoVQU0yN","accessKey":"AKIA……IIIXO","region":"cn-north-1","type":"s3","bCacheEnable":true}
                </RemoteStore>            
            </RemoteStores>
        </Esproc>
        <Logger>
            <Level>DEBUG</Level>
        </Logger>
    </Runtime>
</Config>

其中<DB>节是访问Amazon RDS MySql云数据库的配置信息,此节内容要根据访问的云数据库的信息编写,也可以是其它云平台的其它类型的数据库。如果不需要访问数据库,也可以不配置此节。splPathmainPath必须配置成/tmp,因为lambda虚拟机中只有/tmp目录是可写的,S3远程脚本和远程数据文件下载过来后存放在/tmp中。cursorParallelNum配置SPL脚本中多路并行函数的并行数。RemoteStore是访问你的S3的配置信息(如果不需要用到S3,也可以不配置)cachePath也配置为/tmp

每个层的创建过程都类似的,下面以myLayerLambdaS3层创建为例讲解:

如下图所示,进入lambda服务页面后,点击左边导航菜单Layer,然后点击右边的Create layer按钮。

..

然后进入下图所示的页面:

..

输入NameDescription(可不写),选择Upload a .zip file,点击Upload按钮,选择压缩好的lambda_s3.zip,选择x86_64架构,Compatible runtimes选择Java17Java11Java8,最后点击Create

如果zip压缩包大于50M,则不能直接在这里上传,需要先将zip上传到S3保存,然后在这个页面选择Upload a file from Amazon S3,并输入zip文件在S3上的URL地址。

如果层的zip部署包有修改,那么需要在层列表页面中点击这个层,然后点击Create version按钮,重新上传zip部署包。

..

全部层创建好以后,层列表如下;

.. 其中 runsplConfig层在测试过程中已经创建过新版本了。

3.2 创建函数

编译runSPL函数相关的Java类,然后将所有class文件打包成runSpl.jar文件。

lambda服务页面选择左边导航菜单Functions,然后点击右边的Create function按钮。

..

然后进入下图所示的创建函数页面:

..

输入函数名runsplRuntime选择Java 11,选择x86_64 Architecture,然后点击Create function按钮进入下图页面。

..

Code tabpage中点击Upload from下拉列表,选择".zip or .jar file",弹出下图窗口:

..

点击Upload按钮,选择已经打好的runSpl.jar文件,点击Save按钮。

Code tabpage中,点击Runtime settings中的Edit按钮:

..

进入下图页面:

..

Handler输入框填写函数的类名和处理方法名,点击Save按钮。返回Code tabpage以后,点击Add a layer按钮

..

进入下图添加层页面:

..

选择Custom layers,从下拉框Custom layers选择myLayerLambdaS3层,Version选择1,点击Add按钮。

重复添加层的操作,依次添加myLayerSplmySqlJdbcrunsplConfig等层。

然后进入Configuration tabpage

..

点击Edit按钮:

..

根据实际需求修改函数需要使用的MemoryEphemeral storageTimeout(2者的大小与AWS收取运行费用额有关),点击Save按钮。

4. 调用函数

4.1 调用方法

写一个用Java程序调用runSPL

public static String callRunSpl( String splx, String paramJson ) throws Exception {
    Region region = Region.CN_NORTH_1;
    AwsCredentials c = AwsBasicCredentials.create( "AKIATA……IIIXO", "aYI3JBZOuR……3FvNhjDhoVQU0yN" );
    StaticCredentialsProvider credential = StaticCredentialsProvider.create(c);
    LambdaClient awsLambda = LambdaClient.builder().credentialsProvider(credential)
    		.httpClient(ApacheHttpClient.builder().socketTimeout( Duration.ofMinutes( 10 ) ).build()).region(region).build();
    JSONObject jo = new JSONObject();
    jo.put( "splx", splx );
    if( paramJson != null ) jo.put( "parameters", paramJson );
    SdkBytes payload = SdkBytes.fromUtf8String( jo.toString() );
    InvokeRequest request = InvokeRequest.builder()
    	.functionName( " arn:aws:lambda:us-west-1:91……708:function:runspl " )
    	.payload(payload).build();
    InvokeResponse res = awsLambda.invoke( request );
    String result = res.payload().asUtf8String();
    awsLambda.close();
    return result;
}

方法中的两个参数splxparamJson分别对应runSPL函数中的参数splxparameters

2行中的region是创建runSPL函数时使用AWS帐户所在的区域,第3行中是登录帐户的accessKeysecretKeyfunctionName方法中被调用函数的名称来源于下图所示:

..

4.2 S3脚本用MySQL表数据计算

Amazon RDS(Amazon Relational Database Service)云数据库管理系统中创建一个MySql数据库实例,并创建TPCH中的orders表,向表中导入数据(略去创建和导入数据的过程,使用JDBC访问其它云数据库的用法与本例类同)。编写使用orders表中数据进行计算的脚本rds-mysql.splx如下:


A

1

=connect("rds_mysql")

2

=A1.cursor@m("SELECT O_ORDERDATE, O_ORDERSTATUS, O_TOTALPRICE FROM ORDERS where O_ORDERKEY<3000000")

3

=A2.groups(year(O_ORDERDATE):year,O_ORDERSTATUS:status;sum(O_TOTALPRICE):amount)

4

=A1.close()

5

return A3

此脚本不需要参数,将脚本文件放在S3un-bucket103存储桶中,函数调用语句如下:

String result = callRunSpl("s3://un-bucket103/rds-mysql.splx", null);

返回的result结果如下:

[{"year":1992,"status":"F","amount":17215923334.65},

{"year":1993,"status":"F","amount":17081696141.77},

{"year":1994,"status":"F","amount":17240680768.02},

{"year":1995,"status":"F","amount":3312513758.50},

{"year":1995,"status":"O","amount":10389864034.13},

{"year":1995,"status":"P","amount":3559186116.08},

{"year":1996,"status":"O","amount":17300963945.13},

{"year":1997,"status":"O","amount":17076455987.72},

{"year":1998,"status":"O","amount":10169293696.22}]

4.3 S3脚本用S3中组表数据文件计算

orders.ctx组表数据文件放在S3上的un-bucket103存储桶中,编写使用此组表中数据进行计算的脚本s3_orders.splx如下:


A

1

=Qfile("un-bucket103/orders.ctx")

2

=date(startDate)

3

=date(endDate)

4

=A1.open().cursor@m(O_ORDERDATE, O_ORDERSTATUS, O_TOTALPRICE;O_ORDERDATE>=A2 && O_ORDERDATE<=A3)

5

=A4.groups(year(O_ORDERDATE):year,O_ORDERSTATUS:status;sum(O_TOTALPRICE):amount)

6

return A5

此脚本定义了2个参数表示查询订单的开始日期和结束日期:

..

将脚本文件放在S3un-bucket103存储桶中,函数调用语句如下:

String result = callRunSpl("s3://un-bucket103/s3_orders.splx", "{\"startDate\":\"1995-01-01\",\"endDate\":\"1997-12-31\"}");

返回的result结果如下:

[{"year":1995,"status":"F","amount":6.614961429260002E9},

{"year":1995,"status":"O","amount":2.082205436133004E10},

{"year":1995,"status":"P","amount":7.109117393009993E9},

{"year":1996,"status":"O","amount":3.460936476086E10},

{"year":1997,"status":"O","amount":3.437363341303998E10}]

4.4 本地脚本使用本地组表数据文件计算

orders.ctx组表数据文件放在runsplConfig层部署包config.zip的根目录中,编写使用此组表中数据进行计算的脚本orders.splx如下:


A

1

=file("/opt/orders.ctx")

2

=date(startDate)

3

=date(endDate)

4

=A1.open().cursor@m(O_ORDERDATE, O_ORDERSTATUS, O_TOTALPRICE;O_ORDERDATE>=A2 && O_ORDERDATE<=A3)

5

=A4.groups(year(O_ORDERDATE):year,O_ORDERSTATUS:status;sum(O_TOTALPRICE):amount)

6

return A5

zip部署包会解压到运行runSPL函数的机器/opt目录中,所以组表文件路径是/opt/orders.ctx,访问本地文件,使用file函数。除此之外,此脚本前例一样。

此脚本文件也部署在config.zip压缩包中(也可以放在S3),函数调用语句如下:

String result = callRunSpl("/opt/orders.splx", "{\"startDate\":\"1995-01-01\",\"endDate\":\"1997-12-31\"}");

返回的result与前例相同。