esProc 组表,ORC,Parquet 的对比

1、前言

大数据技术催生了一些列式存储格式,合适的存储方案是高性能计算的基础。本文主要从应用角度上对比三种开源的列存文件:esProc 组表、ORC 和 Parquet 在数据压缩和读取方面的差异。

2、准备数据

inbound存储数据结构:

No 字段名称 字段类型
1 pid long
2 store string
3 product int
4 indate date
5 num int

 
将表 inbound 生成 2 亿条 txt 文本数据,在本地分别存储为 esProc 组表、ORC 和 Parquet 文件,比较它们的容量并测试相应的读取性能。其中 indate 字段参考 hive 表存储方式,转换成数值格式存储。
 

利用 esProc SPL 脚本生成 txt 文本数据,再由此文本数据转换成其它数据格式文件进行测试。

3、生成数据

将 inbound 表数据生成无序数据分别存为esProc 组表、ORC 和 Parquet 文件。

3.1 TXT数据

其生成 TXT 数据 SPL 脚本如下:

A B C D
1 /uuid(分段数据 ) /分段索引号 /分段长度 /段内序号
2 [] 1 0 0
3

[[300000,400000],[50000,   100000],[600000,700000],[100000,200000],[200000,300000],[1,   50000],[400000,500000],
  [900000,1000000],[500000,600000],[800000,900000],[700000,800000]]

0 /B3:   段起始位置
4 for   220 for   A3 =[[B4(1)+(A4-1)*1000000,   B4(2)+(A4-1)*1000000]]
5 =A2=A2.insert(0,   C4)
6 1000 100 1500 10000
7 200000000 20 60 10
8 0 =date(2015,10,1) =date(2016,9,30) 100
9 =create(pid,   store,product,indate,num)
10 =file("H:/tmp/inbound.txt")
11 func   genOne (store,product) ="store_id_"+mid(string(store+1000000),2)
12 =B7+rand(C7-B7) =to(1000).(rand(interval(B8,C8))).id@u().m(to(B12)).sort()
13 for   B12 =elapse(B8,C12(B13)) =D7+rand(D8-D7)
14 >A8   = A8+1
15 if  D2>=C2 >B3=A2(B2)(1)
16 >C2   = A2(B2)(2)-B3
17 >B2=B2+1
18 =D2=0
19 >A9.insert(0,(D2+B3),   B11,product,C13,D13)
20 >D2   = D2+1
21 for   D6 for   A6 =func(genOne,A21,B21)
22 if   (A8 >= A7) >A10.export@a(A9.cursor())
23 break   A21
24 >A10.export@at(A9.cursor()) >A9.reset()
 
生成 2 亿条数据存入 txt 文件中,生成的文件大小为 8448MB。其中 A2 序列用于辅助生成无序且不重复的数字作为 key 值。
3.2 esProc 组表
生成 esProc 组表数据脚本如下:
A B C
1 =now() 0
2 =file("H:/tmp/ inbound.txt").cursor@t()
3

=file("H:/tmp/data3/   inbound.ctx").create@y(#pid, store, product, indate,num)

4 for A2, 50000 >A3.append(A4)
5 =B1=B1+A4.len()
6 if B1%5000000==0 >output(B1)
7 >A3.close()
8 >output("sum =" / B1)
9 =interval@ms(A1,now())


将文本数据转换成组表数据,生成的组表ctx文件大小为 608MB。

3.3 ORC

利用 Java 程序将 txt 文本数据转换成 ORC 文件数据。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.CompressionKind;

import com.scudata.common.Logger;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
 
public class CreateOrcFile {
   private TypeDescription m_schema;
   Writer m_writer;
 
   public CreateOrcFile (String destFile){
      try {
      /* 定义表结构 */
      String struct =    "struct<pid:int,store:string,product:int,indate:date,num:int>";
      m_schema = TypeDescription.fromString(struct);
      
      File f = new File(destFile);
      if (f.exists()) {
        f.delete();
      }
      m_writer = getWriter(destFile);    
   } catch (IOException e) {    
       Logger.error("CreateOrcFile:: "+e);
   }
  }
 
  public static void main(String[] args) throws IOException {
     String srcFile = "H:/tmp/data3/inbound.txt";
     String destFile = "h:/tmp/data3/out_inbound.orc";
     CreateOrcFile cls = new CreateOrcFile(destFile);
     cls.writeToParquetFile(srcFile);
  }
  
  private void writeToParquetFile(String srcFile)
  {
   try {
    Sequence tab = null;
    FileObject oRet = new FileObject(srcFile);    
    String exp = String.format(";,\",\"");
    IParam param = ParamParser.parse(exp, null, null);
    ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
 
    long total = 0;
    int unitSize = 10000;
    VectorizedRowBatch batch = m_schema.createRowBatch();
    LongColumnVector pid = (LongColumnVector) batch.cols[0];
    BytesColumnVector store = (BytesColumnVector) batch.cols[1];
    LongColumnVector product = (LongColumnVector) batch.cols[2];
    DateColumnVector indate = (DateColumnVector) batch.cols[3];
    LongColumnVector num = (LongColumnVector) batch.cols[4];
    
    while(null!=(tab = (Sequence) c.fetch(unitSize)) ) {
      total += tab.length();
      for(int i=0; i<tab.length(); i++) {
      BaseRecord sq = (BaseRecord)tab.get(i+1);
         
      int row = batch.size++;
      pid.vector[row] = Long.parseLong(sq.getFieldValue(0).toString());
      byte[] buffer = ((String)sq.getFieldValue(1)).getBytes(StandardCharsets.UTF_8);
      store.setRef(row, buffer, 0, buffer.length);
      product.vector[row] = (Integer)sq.getFieldValue(2);
      Date d = (Date)sq.getFieldValue(3);
      indate.vector[row] = dateToDays(d);
      num.vector[row] = Long.parseLong(sq.getFieldValue(4).toString());
 
      if (batch.size == batch.getMaxSize()) {
        m_writer.addRowBatch(batch);
        batch.reset();
      }      
     }
    
     if (total %500000==0) {
       System.out.println("idx = "+total);
     }
    
     if (tab.length()<unitSize) {
       break;
     }
    }
    if (batch.size != 0) {
      m_writer.addRowBatch(batch);
    }
    
    m_writer.close();
    }catch(Exception e) {
      System.out.println("aaa: "+e);
    }
  }
  
  /*日期转换成天数 */
  public static int dateToDays(Date date) { 
    int days = 0;
    ZoneId zoneId = ZoneId.systemDefault();
    int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds();
    long zonems = offsetSeconds * 1000;
    long milliseconds = date.getTime()+zonems;
    days = (int)(milliseconds / (1000 * 60 * 60 * 24));
    return days;
  }
 
  /*ORC写入操作对象*/
  private Writer getWriter(String filePath) throws IOException {    
     return OrcFile.createWriter(new Path(filePath), 
      OrcFile.writerOptions(new Configuration()).
        setSchema(m_schema).
        compress(CompressionKind.SNAPPY)); 
  }
}

将文本数据转换成 ORC 文件数据,生成的文件大小为 513.9MB。

3.4 Parquet

同上,利用 Java 程序生成数据。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
 
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
 
public class CreateParquetFile {
	private static final MessageType FILE_SCHEMA;
	private static final String TABLE_NAME = "inbound_parquet";
	ParquetWriter<Group> m_writer;
 
  static {
	  //定义表结构
	  Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
	  messageTypeBuilder
		.optional(PrimitiveType.PrimitiveTypeName.INT64)
		.named("pid")
	  .optional(PrimitiveType.PrimitiveTypeName.BINARY)
	  .as(LogicalTypeAnnotation.stringType())
	  .named("store")
	  .optional(PrimitiveType.PrimitiveTypeName.INT32)
	  .named("product")
	  .optional(PrimitiveType.PrimitiveTypeName.INT32)
	  .named("indate")
	  .optional(PrimitiveType.PrimitiveTypeName.INT64)
	  .named("num");
	  FILE_SCHEMA = messageTypeBuilder.named(TABLE_NAME);
	}
 
  public CreateParquetFile (String outFilePath){
     try {
       File f = new File(outFilePath);
       if (f.exists()) {
         f.delete();
       }
       m_writer = getWriter(outFilePath);
     } catch (IOException e) {
        e.printStackTrace();
     }
  }
  
  /*获取写入操作对象*/
  private static ParquetWriter<Group> getWriter(String filePath) throws IOException {
     Path path = new Path(filePath);
     return ExampleParquetWriter.builder(path)
       .withWriteMode(ParquetFileWriter.Mode.CREATE)
       .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
       .withCompressionCodec(CompressionCodecName.SNAPPY)
		 .withConf(new Configuration())
		 .withType(FILE_SCHEMA).build();
	}
 	
  /*生成ParquetFile*/
  private void writeToParquetFile(String sfile){
    try {
       FileObject oRet = new FileObject(sfile);
       String exp = String.format(";,\",\"");
       IParam param = ParamParser.parse(exp, null, null);
       ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
       Sequence tab = null;       
       long total = 0;
       while(null!=(tab = (Sequence) c.fetch(10000)) ) {
       	total += tab.length();
       	writeToFile(tab);
       	if (total %500000==0) {
          System.out.println("idx = "+total);
       	}
        	
        if (tab.length()<10000) {
          break;
       	}
       }
       m_writer.close();
    }catch(Exception e) {
       System.out.println("aaa: "+e);
    }
  }
 	
  public static int dateToInt(Date date) {
     long milliseconds = date.getTime();
     return (int) (milliseconds / 1000);
  }
 
  /*数据写入*/
  private void writeToFile(Sequence tab) throws IOException { 
    SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(FILE_SCHEMA);
    Group group = null;
    for(int i=0;i&lt;tab.length(); i++)
    { 
       BaseRecord sq = (BaseRecord)tab.get(i+1);       
       try { 
          group = simpleGroupFactory.newGroup();
          group.append("pid", Long.parseLong(sq.getFieldValue(0).toString()) );
          group.append("store", (String)sq.getFieldValue(1));
          group.append("product", (Integer)sq.getFieldValue(2));
          Date d = (Date)sq.getFieldValue(3);
          group.append("indate", dateToDays(d));
          group.append("num", Long.parseLong(sq.getFieldValue(4).toString()) );
          m_writer.write(group);
        }catch(Exception e) { 
          System.out.println("writeToFile:  "+e);
        }
     }
  }

  public static int dateToDays(Date date) {
     int days = 0;
     ZoneId zoneId = ZoneId.systemDefault();
     int offsetSeconds = zoneId.getRules().getOffset(Instant 
.now()).getTotalSeconds();
     long zonems = offsetSeconds * 1000;
     long milliseconds = date.getTime()+zonems;
     days = (int)(milliseconds / (1000 * 60 * 60 * 24));
     return days;
  }
 
  public static void main(String[] args) throws IOException {
    String srcFile = "H:/tmp/data3/inbound.txt";
    /*获取文件写入的目录*/
    String destFile= "h:/tmp/data3/out_inbound.parquet";
    CreateParquetFile cls = new CreateParquetFile(destFile);
    cls.writeToParquetFile(srcFile);
    System.out.println("OK.");
  }
}


将文本数据转换成 Parquet 文件数据,生成的文件大小为 1157MB。

4、读取数据

将读取的文件数据记录,按字段数据类型进行相应的转换,并存放到 Object[] 数组中,按此方式对 esProc 组表,ORC,Parquet 三种文件遍历读取用时测试。

4.1 esProc 组表

读取数据 SPL 脚本:

A B
1 =now()
2 =file("H:/tmp/data3/inbound.ctx").open()
3 =A2.cursor()
4 for A3, 10000 >B1=B1+A4.len()
5 =A2.close()
6 >output("total=" / B1)
7 =interval@ms(A1,now())


利用游标遍历读取组表数据,用时 11.053 秒。

4.2 ORC

利用 java 程序读取:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.time.LocalDate;
  
public class InboundReader {
   public static void main(String[] args) throws Exception {
     try {
       long start = System.currentTimeMillis();
       String sfile = "H:/tmp/data3/out_inbound.orc";
       Reader reader = OrcFile.createReader(new Path(sfile), OrcFile.readerOptions(conf));
       TypeDescription readSchema = reader.getSchema();
       System.out.println("Row count: " + reader.getNumberOfRows());
       VectorizedRowBatch batch = readSchema.createRowBatch(50000);
       RecordReader rowIterator = reader.rows(reader.options().schema(readSchema));
       LongColumnVector pid = (LongColumnVector) batch.cols[0];
       BytesColumnVector store = (BytesColumnVector) batch.cols[1];
       LongColumnVector product = (LongColumnVector) batch.cols[2];
       DateColumnVector indate = (DateColumnVector) batch.cols[3];
       LongColumnVector total = (LongColumnVector) batch.cols[4];
       long sum = 0;
  	
       String[] cols = new String[]{"pid","store", "product","indate","num"};
       Object[] items = new Object[cols.length];
  
       while (rowIterator.nextBatch(batch)) {				
         for (int row = 0; row < batch.size; ++row) {	
            int productRow = product.isRepeating ? 0 : row;
            int indateRow = indate.isRepeating ? 0 : row;
            int totalRow = total.isRepeating ? 0 : row;
            items[0] = pid.vector[row];
            items[1] = store.toString(row);
            items[2] = (product.noNulls || !product.isNull[productRow] ? product.vector[productRow] : 0);
            items[3] = (indate.noNulls || !indate.isNull[indateRow] ? LocalDate.ofEpochDay(indate.vector[indateRow]).toString() : null);
            items[4] = (total.noNulls || !total.isNull[totalRow] ? total.vector[totalRow] : 0);
          }
          sum+=batch.size;
        }
  
        rowIterator.close();
        reader.close();
        System.out.println("sum = "+ sum+"; Time = "+(System.currentTimeMillis() - start));
      }catch(Exception e) {
        System.out.println(e);
      }	
   }
}


遍历读取 ORC 文件,用时 19.595 秒。
 

4.3 Parquet
import java.time.LocalDate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupValueSource;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.GroupType;
import com.scudata.common.Logger;
  
public class ParquetReaderTest {
   public static void main(String[] args) throws Exception {
      try {
      	long nStart = System.currentTimeMillis();
      	ParquetReaderTest cls = new ParquetReaderTest();
      	String srcFile = "h:/tmp/data3/out_inbound.parquet";
      	cls.parquetRead(srcFile);
      	System.out.println("time = " + (System.currentTimeMillis() - nStart));
      } catch (Exception e) {
      	e.printStackTrace();
      }
	}
  
    private void parquetRead(String srcFile) throws Exception {
      long sum = 0;
      Map<String, String> map = new LinkedHashMap<>();
      map.put("pid", "long");
      map.put("store", "string");
      map.put("product", "int");
      map.put("indate", "date");
      map.put("num", "long");
  
      GroupReadSupport readSupport = new GroupReadSupport();
      ParquetReader<Group> reader = new ParquetReader<Group>(new Path(srcFile), readSupport);
      Group line = null;
      Object[] items = null;
      while ((line = reader.read()) != null) { 
        items = readGroup(line, map.values());
      	sum++;
      	if (sum % 5000000 == 0) {  
          System.out.println("idx = " + sum);
      	}
      }
      reader.close();
      System.out.println("sum=" + sum);
  }
  
  private Object[] readGroup(Group g, Collection<String> colsType) throws Exception {
      Object[] items = new Object[colsType.size()];
      try {
      	String sType = null;
      	Iterator<String> iter = colsType.iterator();
      	int n = 0;
      	while (iter.hasNext()) {
            sType = iter.next();
            Type colType = g.getType().getFields().get(n);
            if (colType.isPrimitive()) {
            	if (sType.equals("string")) {
                  items[n] = g.getString(n, 0);
            	} else if (sType.equals("int")) {
                  items[n] = g.getInteger(n, 0);
            	} else if (sType.equals("long")) {
                  items[n] = g.getLong(n, 0);
            	} else if (sType.equals("double")) {
                  items[n] = g.getDouble(n, 0);
            	} else if (sType.equals("float")) {
                  items[n] = g.getFloat(n, 0);
            	} else if (sType.equals("boolean")) {
                  items[n] = g.getBoolean(n, 0);
            	} else if (sType.equals("int96")) {
                  items[n] = g.getInt96(n, 0);
            	} else if (sType.equals("date")) {
                  items[n] = LocalDate.ofEpochDay(g.getInteger(n, 0)).toString();
            	}
            	n++;
            } else {
            	GroupValueSource subG = g.getGroup(n, 0);
            	GroupType pt = subG.getType();
            	int colSize = pt.getFieldCount();
            	System.out.println("subColSize = " + colSize);
            }
      	}
      } catch (Exception e) {  
        Logger.error("readGroup:: " + e);
      }
  
      return items;
   }
}


遍历读取 Parquet 文件,用时 85.370 秒。测试发现若 date 类型以 int96 二进制方式存储,转换成 date 更慢。

5、汇总与总结

组表 ORC       Parquet  TXT
压缩格式 lz4 snappy snappy
文件大小 (MB) 608         513.9      1157   8448
第 1 次用时 (秒) 15.519  28.517    93.972
第 2 次用时 (秒) 10.841  18.034    76.114
第 3 次用时 (秒) 11.053  19.595 85.370


从表中可以看出,相对于无压缩的文本数据,这三种列式存储格式都有不错的数据压缩比,大大节约了数据存储空间。其中 ORC 的压缩率最高,esProc 组表略低一点但相差不大(约 20%),Parquet 的压缩率明显要差很多,已经超过 ORC 的两倍了。
读取速度方面,则 esProc 组表最快,几乎比 ORC 快了一倍,更是远远超过了 Parquet。这从另一个侧面可以说明,Parquet 可能已经是过时的文件格式了,正在被 ORC 取代。
综合压缩率和读取性能两方面来看,esProc 组表的整体优势更大。如果再考虑到 esProc 组表上的灵活分段及有序定位等功能,其用于大数据计算的性能优势将会更明显。