MongoDB 上的计算库

MongoDB 内置的查询表达式有时不够方便,这种情况下就要用到第三方函数库完成计算,本文深度比较四类 MongoDB 上的计算库,包括 MongoDB Connectioin\Calcite\Scala\ 集算器 SPL,重点考察这些工具在语法表达和部署配置方面的差异,详情点击MongoDB 上的计算库

MongoDB内置json风格的查询表达式,但有时候用起来不太方面,这种情况下我们要把数据从MongoDB取出来,用外部的第三方库函数完成计算。下面将对比MongoDB上的几种计算库,尤其是语法表达和部署配置方面的区别。

MongoDB Connector

这是MongoDB官方提供的计算库,主要功能是模拟MySQL服务,负责SQLjson查询表达式的翻译,对上接收ODBCJDBCSQL请求,对下用json查询表达式访问MongoDB

Connector支持基本的SQL语法,下面举例说明。MongoDB有名为test1collection,大多数字段为简单类型,用来存储员工信息,Orders字段为数组类型,用来存储当前员工的多个订单。部分数据如下:

[{

      "_id": {"$oid":   "6074f6c7e85e8d46400dc4a7"},

      "EId": 7,"State":   "Illinois","Dept": "Sales","Name":   "Alexis","Gender": "F","Salary":   9000,"Birthday": "1972-08-16",

      "Orders": [

         {"OrderID":   70,"Client": "DSG","SellerId":   7,"Amount": 288,"OrderDate": "2009-09-30"},

         {"OrderID":   131,"Client": "FOL","SellerId":   7,"Amount": 103.2,"OrderDate": "2009-12-10"}

    ]

}

{

      "_id": {"$oid":   "6074f6c7e85e8d46400dc4a8"},

      "EId": 8,"State": "California", ...

}]

下面用SQL嵌入JAVA代码,实现针对订单表的条件查询。

package mon;
  import java.sql.Connection;
  import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.Statement;
  public class Main {
      public static void main(String[]   args)throws Exception {
            Class.forName("com.mysql.jdbc.Driver");
          Connection connection   =DriverManager.getConnection("jdbc:mysql://127.0.0.1:3307?source=mongo&mechanism=PLAIN&useSSL=false&authenticationPlugins=org.mongodb.mongosql.auth.plugin.MongoSqlAuthenticationPlugin");
          Statement statement =   connection.createStatement();
          String str="SELECT * FROM   mongo.test1_orders where `Orders.Amount`>1000 and `Orders.Amount`<=3000   and `Orders.Client` like'%S%' ";
          ResultSet result =   statement.executeQuery(str);


          if(connection != null)   connection.close();
      }
  }

类似地,只需修改SQL语句,还可以实现分组汇总和条件查询:

str="SELECT    year(`Orders.Orderdate`) y,sum(`Orders.Amount`) s FROM mongo.test1_orders  group by year(`Orders.Orderdate`)";

str= "SELECT    o.`Orders.OrderID`,o.`Orders.Client`,o.`Orders.SellerId`,o.`Orders.Amount`,o.`Orders.OrderDate`,e.Name,e.Gender,e.Dept   from mongo.test1_Orders o, mongo. test1 e where   o.`Orders.SellerId`=e.EId";

上面代码中,Orders.Orderdate是子文档的默认字段名,虽然里面用到了点号,但实际上SQL不支持多层数据类型,所以Orders.Orderdate只是外观像主子关系(可在元数据文件中重定义),但实际并不会分别解析。事实上,Connectorcollection test1识别为2个独立的表,一个是不含子文档的table test1,另一个是只有子文档的table test1_Orders。这就导致SQL必须再此(额外)建立关联关系,而不能利用collection原有的天然主子关系。显然,这样的计算效率并不高。

除了不支持多层结构这种通用的SQL缺点之外,Connector 本身也是各类SQL中表达能力较弱的,比如不支持窗口函数。事实上,官网已经明确说MongoDB Connector只适合一些BI工具的基本需求。

由于是官方产品,所以MongoDB Connector的集成和配置都很简单。安装本计算库后,只需在命令行执行如下命令,即可启动数据库服务:

mongosqld --mongo-uri   "mongodb://localhost:27017/?connect=direct" --addr   "127.0.0.1:3307"

Calcite

 Calcite的理想是用SQL语言计算任意数据源,其中就包括MongoDB。遗憾的是,Calcite on MongoDB的文档少且粗,有些功能没有找到具体说明,这导致下面的描述可能不准确。

Calcite只能以collection为单位取数,如果colleciton较大,就很容易内存溢出(Calcite只支持内存计算)。Calcite不能从多层collection中取数,比如查询前面collection test1里的子文档。为了迁就Calcite,这里将test重整成2个单层collection,即EmployeesOrders

使用Calcitecollection Orders进行条件查询时,代码如下:

package org.example;
  import java.sql.Connection;
  import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.Statement;
  import java.util.Properties;
  public class App
  {
      public static void main(String[]   args ) throws Exception{
          Properties config = new   Properties();
          config.put("model",   "d:\\mongo-model.json");
          config.put("lex",   "MYSQL");

          Connection con =   DriverManager.getConnection("jdbc:calcite:", config);
          Statement stmt =   con.createStatement();

        String   sql ="select * from orders where Amount>1000 and   Amount<=3000";
          ResultSet rs =   stmt.executeQuery(sql);
          …
          if(con!= null) con.close();
      }
  }

应该注意到,这里的条件查询语句简化了,没有模糊查询部分(其他计算库都有),这是因为Calcite还不支持模糊查询。类似地,Calcite也不支持取年份的函数,或字符串和日期的转换函数,所以前面的分组汇总无法实现,只能改写成下面这样:

sql="select Client, sum(Amount) from orders group by Client";

Calciteshe对关联计算的支持也不好,比如不能取部分字段,只能用*号取全部字段,所以前面的关联查询无法实现,只能改写成下面这样:

sql="SELECT * from Orders,Employees where   Orders.SellerId=Employees.EId";

Calcite的配置分两部分,首先在Maven中引入calcite-mongodb,之后建立元数据文件mongo_model.json,具体内容如下:

{

    "version": "1.0",

  "defaultSchema":   "dSchema",

    "schemas": [

   {

        "type": "custom",

        "name": "alias",

        "factory":   "org.apache.calcite.adapter.mongodb.MongoSchemaFactory",

        "operand": {

          "host": "localhost:27017",

          "database": "mongo"

      }

    },

    {

        "name": "dSchema",

        "tables": [

             {

            "name": "orders",

            "type": "view",

            "sql": "select cast(_MAP['OrderID'] AS integer)AS   OrderID,cast(_MAP['Client'] AS varchar(40)) AS Client,cast(_MAP['SellerId']   AS integer)AS SellerId,cast(_MAP['Amount'] AS float)AS   Amount,cast(_MAP['OrderDate'] AS varchar(20)) AS OrderDate from   \"alias\".\"Orders\""

        },

       {

            "name": "employees",

            "type": "view",

            "sql": "select cast(_MAP['EId'] AS integer)AS   EId,cast(_MAP['State'] AS varchar(40)) AS State,cast(_MAP['Dept'] AS   varchar(40)) AS Dept,cast(_MAP['Name'] AS varchar(40)) AS   Name,cast(_MAP['Gender'] AS varchar(40)) AS Gender,cast(_MAP['Salary'] AS   float)AS Salary,cast(_MAP['Birthday'] AS    varchar(20)) AS Birthday from   \"alias\".\"Employees\""

        }

      ]

    }

  ]

}

上面配置中,\"alias\".\"Orders\"是物理表名,orders是对应的视图名。理论上不用配置视图,只需在代码中直接查物理表即可,但实际上直接查物理表会导致很多SQL错误(比如分组汇总),这很可能是Calcite不够完善导致的。

Scala

Scala是常用的结构化计算语言,对MongoDB支持较早,其原理是:先从MongoDB读取collection,存储为ScalaDataFrame数据对象(或RDD),再用DataFrame的通用计算能力完成计算。

Scala计算库存在一些先天缺点。Scala只能以collection为单位取数,不支持用mongoDBjson查询表达式取数,如果colleciton数据量较大,则取数会花费大量时间。Scala不能从多层collection中取数,如果想计算MongoDB中的多层collection,则必须改造成多个单层collection。比如前面例子中的test1必须拆成2个单层的OrdersEmployees

使用Scalacollection Orders进行条件查询的代码如下:

package test

import org.apache.spark.sql.SparkSession

import com.mongodb.spark.config._

import com.mongodb.spark.sql.toSparkSessionFunctions

object Mon {

  def   main(args: Array[String]): Unit = {

    val   warehouseLocation = "file:${system:user.dir}/spark-warehouse"

    val spark   = SparkSession.builder()

        .master("local")

        .appName("MongoDB Test")

        .getOrCreate()

 

    val Orders   = spark.loadFromMongoDB(ReadConfig(

        Map("uri" -> "mongodb://127.0.0.1:27017/mongo.Orders")

    ))

    val   condtion=Orders.where("Amount>1000 and Amount<=3000 and Client   like'%S%' ")

      condtion.show()

  }

}

类似地,还可以实现分组汇总和关联计算:

//分组汇总

val groupBy=Orders.groupBy(year(Orders("OrderDate"))).agg(sum("Amount"))

//关联计算

val Employees = spark.loadFromMongoDB(ReadConfig(

        Map("uri" ->   "mongodb://127.0.0.1:27017/mongo.employees")

    ))

val   join=Orders.join(Employees,Orders("SellerId")===Employees("EId"),"Inner")
      .select("OrderID","Client","SellerId","Amount","OrderDate","Name","Gender","Dept")

应该注意到,虽然必须拆分多层的collection才能取数,但只要取到数据,DataFrame的计算能力还是非常强的,这便是通用数据结构的好处。

配置方面,对于程序员来说非常简单,只需在Maven加入org.mongodb.spark即可。

集算器 SPL

集算器 SPL也是专业的开源结构化计算引擎,原理和Calcite类似,可以用统一的语法和数据结构计算各类数据源,其中就包括MongoDB。但集算器 SPL更“轻”,层次更少,语法更简单,对MongoDB的支持也更成熟。

比如对多层collection test1进行条件查询,SPL代码写作:


A

1

=mongo_open("mongodb://127.0.0.1:27017/mongo")

2

=mongo_shell(A1,"test1.find()")

3

=A2.conj(Orders)

4

=A3.select(Amount>1000 &&   Amount<=3000 && like@c(Client,"*s*")).fetch()

5

=mongo_close(A1)

A2可以看出来,SPL支持MongoDBjson查询表达式(findcountdistinctaggregate),比如区间查询写作:=mongo_shell(A2,"test1.find({Orders.Amount:{$gt:1000,$lt:3000}})")

collection数据较多且json表达式较简单的时候,可以通过这种方式减少取到的数据,以防内存溢出;也可以加快查询速度,比如针对索引的查询。如果取到的数据依旧很多,SPL也能轻松处理,因为A2返回的是游标类型,可以计算超出内存的数据。

上述代码可在IDE中执行,也可以存为脚本文件(比如select.dfx),通过JDBC接口在JAVA中调用,具体如下:

package Test;
  import java.sql.Connection;
  import java.sql.DriverManager;
  import java.sql.ResultSet;
  import java.sql.Statement;
  public class test1 {
      public static void main(String[]   args)throws Exception {
            Class.forName("com.esproc.jdbc.InternalDriver");
          Connection connection   =DriverManager.getConnection("jdbc:esproc:local://");
          Statement statement =   connection.createStatement();
          ResultSet result =   statement.executeQuery("call select()");

……
          if(connection != null)   connection.close();
      }
  }

 

类似地,分组汇总代码如下:


A

1

=mongo_open("mongodb://127.0.0.1:27017/mongo")

2

=mongo_shell(A1,"test1.find()")

3

=A2.conj(Orders).groups(year(OrderDate);sum(Amount))

4

=mongo_close(A1)

关联查询代码如下:


A

1

=mongo_open("mongodb://127.0.0.1:27017/mongo")

2

=mongo_shell(A1,"test1.find()")

3

=A2.new(Orders.OrderID,Orders.Client,   Name,Gender,Dept).fetch()

4

=mongo_close(A1)

这里要注意的是,SPL的关联代码要比其他计算库都要简单(实际没有关联动作),甚至比MongoDB官方产品Connector简单。这是因为SPL的数据结构本身就是多层的,可以直接对应test1这种多层的collection,可以天然表达主子关系,如此一来就不必再额外进行关联。而其他计算库都是单层数据结构,难以对应多层collection

当然,SPL也支持2个单层collection的关联:


A

1

=mongo_open("mongodb://127.0.0.1:27017/mongo")

2

=mongo_shell(A1,"Orders.find()").fetch()

3

=mongo_shell(A1,"Employees.find()").fetch()

4

=mongo_close(A1)

5

=join(A2,SellerId;A3,EId)

6

=A5.new(_1.OrderID,_1.Client,_2.Name,_2.Gender,_2.Dept)

SPL表达形式多样,除了本身的过程化语法,还支持SQL语法。因为SQL数据类型不支持多层数据(参考CalciteConnector),所以只支持2个单层collection的关联,代码如下:


A

1

=mongo_open("mongodb://127.0.0.1:27017/mongo")

2

=mongo_shell(A34,"Orders.find()").fetch()

3

=mongo_shell(A34,"Employees.find()").fetch()

4

=mongo_close(A34)

5

$select o.OrderId,o.Client,e.Name,e.Gender,e.Dept from   {A35}  o join {A36} e on   o.SellerId=e.EId

MongoDB的特色是多层数据,用json表达式计算多层数据会遇到很多困难,这种情况下SPL做计算库经常可以简化计算。比如:统计下面每条记录中 incomeoutput 的数量之和。

_id

income

output

1

{"cpu":1000, "mem":500,     "mouse":"100"}

{"cpu":1000, "mem":600   ,"mouse":"120"}

2

{"cpu":2000,"mem":1000,    "mouse":"50","mainboard":500 }

{"cpu":1500, "mem":300}

json表达式计算时,代码很繁琐:

var fields = [  "income",   "output"];

db.computer.aggregate([ 

   { 

      $project:{ 

           "values":{ 

              $filter:{ 

                 input:{ 

                      "\$objectToArray":"$$ROOT"

                 },

                 cond:{ 

                    $in:[ 

                       "$$this.k",

                       fields

                    ]

                 }

              }

         }

      }

   },

   { 

      \$unwind:"$values"

   },

   { 

      $project:{ 

           key:"$values.k",

           values:{ 

              "$sum":{ 

                 "$let":{ 

                    "vars":{ 

                       "item":{ 

                            "\$objectToArray":"$values.v"

                       }

                    },

                      "in":"$$item.v"

                 }

              }

         }

      }

   },

   {$sort: {"_id":-1}},

   { "$group": {

    "_id": "$_id",

    'income':{"\$first":     "$values"},

    "output":{"\$last":     "$values"}

    }},

]);

SPL计算就简单多了:


A

1

=mongo_open("mongodb://127.0.0.1:27017/raqdb")

2

=mongo_shell(A1,"computer.find()").fetch()

3

=A2.new(_id:ID,income.array().sum():INCOME,output.array().sum():OUTPUT)

4

>A1.close()

最后说下集算器 SPL的配置。在Extend library中启用MongoCli即可完成配置,配置时可用图形界面。

通过上述比较可以看出:在语法表达方面,集算器 SPL对多层数据支持较完美;Connector可以将多层数据识别为多个表,基本能用;scala要将多层数据改造成单层,成本非常高;Calcite不仅改造成本高,而且不够成熟稳定。在配置部署方面,Connector最为方便,集算器 SPLScala也比较简单,Calcite依旧垫底。