esProc 组表,ORC,Parquet 的对比
1、前言
大数据技术催生了一些列式存储格式,合适的存储方案是高性能计算的基础。本文主要从应用角度上对比三种开源的列存文件:esProc 组表、ORC 和 Parquet 在数据压缩和读取方面的差异。
2、准备数据
表inbound存储数据结构:
No | 字段名称 | 字段类型 |
1 | pid | long |
2 | store | string |
3 | product | int |
4 | indate | date |
5 | num | int |
将表 inbound 生成 2 亿条 txt 文本数据,在本地分别存储为 esProc 组表、ORC 和 Parquet 文件,比较它们的容量并测试相应的读取性能。其中 indate 字段参考 hive 表存储方式,转换成数值格式存储。
利用 esProc SPL 脚本生成 txt 文本数据,再由此文本数据转换成其它数据格式文件进行测试。
3、生成数据
将 inbound 表数据生成无序数据分别存为esProc 组表、ORC 和 Parquet 文件。
3.1 TXT数据
其生成 TXT 数据 SPL 脚本如下:
A | B | C | D | |
1 | /uuid(分段数据 ) | /分段索引号 | /分段长度 | /段内序号 |
2 | [] | 1 | 0 | 0 |
3 | [[300000,400000],[50000, 100000],[600000,700000],[100000,200000],[200000,300000],[1, 50000],[400000,500000], |
0 | /B3: 段起始位置 | |
4 | for 220 | for A3 | =[[B4(1)+(A4-1)*1000000, B4(2)+(A4-1)*1000000]] | |
5 | =A2=A2.insert(0, C4) | |||
6 | 1000 | 100 | 1500 | 10000 |
7 | 200000000 | 20 | 60 | 10 |
8 | 0 | =date(2015,10,1) | =date(2016,9,30) | 100 |
9 | =create(pid, store,product,indate,num) | |||
10 | =file("H:/tmp/inbound.txt") | |||
11 | func genOne (store,product) | ="store_id_"+mid(string(store+1000000),2) | ||
12 | =B7+rand(C7-B7) | =to(1000).(rand(interval(B8,C8))).id@u().m(to(B12)).sort() | ||
13 | for B12 | =elapse(B8,C12(B13)) | =D7+rand(D8-D7) | |
14 | >A8 = A8+1 | |||
15 | if D2>=C2 | >B3=A2(B2)(1) | ||
16 | >C2 = A2(B2)(2)-B3 | |||
17 | >B2=B2+1 | |||
18 | =D2=0 | |||
19 | >A9.insert(0,(D2+B3), B11,product,C13,D13) | |||
20 | >D2 = D2+1 | |||
21 | for D6 | for A6 | =func(genOne,A21,B21) | |
22 | if (A8 >= A7) | >A10.export@a(A9.cursor()) | ||
23 | break A21 | |||
24 | >A10.export@at(A9.cursor()) | >A9.reset() |
生成 2 亿条数据存入 txt 文件中,生成的文件大小为 8448MB。其中 A2 序列用于辅助生成无序且不重复的数字作为 key 值。
3.2 esProc 组表
生成 esProc 组表数据脚本如下:A | B | C | |
1 | =now() | 0 | |
2 | =file("H:/tmp/ inbound.txt").cursor@t() | ||
3 | =file("H:/tmp/data3/ inbound.ctx").create@y(#pid, store, product, indate,num) |
||
4 | for A2, 50000 | >A3.append(A4) | |
5 | =B1=B1+A4.len() | ||
6 | if B1%5000000==0 | >output(B1) | |
7 | >A3.close() | ||
8 | >output("sum =" / B1) | ||
9 | =interval@ms(A1,now()) |
将文本数据转换成组表数据,生成的组表ctx文件大小为 608MB。
3.3 ORC
利用 Java 程序将 txt 文本数据转换成 ORC 文件数据。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.CompressionKind;
import com.scudata.common.Logger;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
public class CreateOrcFile {
private TypeDescription m_schema;
Writer m_writer;
public CreateOrcFile (String destFile){
try {
/* 定义表结构 */
String struct = "struct<pid:int,store:string,product:int,indate:date,num:int>";
m_schema = TypeDescription.fromString(struct);
File f = new File(destFile);
if (f.exists()) {
f.delete();
}
m_writer = getWriter(destFile);
} catch (IOException e) {
Logger.error("CreateOrcFile:: "+e);
}
}
public static void main(String[] args) throws IOException {
String srcFile = "H:/tmp/data3/inbound.txt";
String destFile = "h:/tmp/data3/out_inbound.orc";
CreateOrcFile cls = new CreateOrcFile(destFile);
cls.writeToParquetFile(srcFile);
}
private void writeToParquetFile(String srcFile)
{
try {
Sequence tab = null;
FileObject oRet = new FileObject(srcFile);
String exp = String.format(";,\",\"");
IParam param = ParamParser.parse(exp, null, null);
ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
long total = 0;
int unitSize = 10000;
VectorizedRowBatch batch = m_schema.createRowBatch();
LongColumnVector pid = (LongColumnVector) batch.cols[0];
BytesColumnVector store = (BytesColumnVector) batch.cols[1];
LongColumnVector product = (LongColumnVector) batch.cols[2];
DateColumnVector indate = (DateColumnVector) batch.cols[3];
LongColumnVector num = (LongColumnVector) batch.cols[4];
while(null!=(tab = (Sequence) c.fetch(unitSize)) ) {
total += tab.length();
for(int i=0; i<tab.length(); i++) {
BaseRecord sq = (BaseRecord)tab.get(i+1);
int row = batch.size++;
pid.vector[row] = Long.parseLong(sq.getFieldValue(0).toString());
byte[] buffer = ((String)sq.getFieldValue(1)).getBytes(StandardCharsets.UTF_8);
store.setRef(row, buffer, 0, buffer.length);
product.vector[row] = (Integer)sq.getFieldValue(2);
Date d = (Date)sq.getFieldValue(3);
indate.vector[row] = dateToDays(d);
num.vector[row] = Long.parseLong(sq.getFieldValue(4).toString());
if (batch.size == batch.getMaxSize()) {
m_writer.addRowBatch(batch);
batch.reset();
}
}
if (total %500000==0) {
System.out.println("idx = "+total);
}
if (tab.length()<unitSize) {
break;
}
}
if (batch.size != 0) {
m_writer.addRowBatch(batch);
}
m_writer.close();
}catch(Exception e) {
System.out.println("aaa: "+e);
}
}
/*日期转换成天数 */
public static int dateToDays(Date date) {
int days = 0;
ZoneId zoneId = ZoneId.systemDefault();
int offsetSeconds = zoneId.getRules().getOffset(Instant.now()).getTotalSeconds();
long zonems = offsetSeconds * 1000;
long milliseconds = date.getTime()+zonems;
days = (int)(milliseconds / (1000 * 60 * 60 * 24));
return days;
}
/*ORC写入操作对象*/
private Writer getWriter(String filePath) throws IOException {
return OrcFile.createWriter(new Path(filePath),
OrcFile.writerOptions(new Configuration()).
setSchema(m_schema).
compress(CompressionKind.SNAPPY));
}
}
将文本数据转换成 ORC 文件数据,生成的文件大小为 513.9MB。
3.4 Parquet
同上,利用 Java 程序生成数据。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import com.scudata.dm.BaseRecord;
import com.scudata.dm.FileObject;
import com.scudata.dm.Sequence;
import com.scudata.dm.cursor.ICursor;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Date;
public class CreateParquetFile {
private static final MessageType FILE_SCHEMA;
private static final String TABLE_NAME = "inbound_parquet";
ParquetWriter<Group> m_writer;
static {
//定义表结构
Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();
messageTypeBuilder
.optional(PrimitiveType.PrimitiveTypeName.INT64)
.named("pid")
.optional(PrimitiveType.PrimitiveTypeName.BINARY)
.as(LogicalTypeAnnotation.stringType())
.named("store")
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("product")
.optional(PrimitiveType.PrimitiveTypeName.INT32)
.named("indate")
.optional(PrimitiveType.PrimitiveTypeName.INT64)
.named("num");
FILE_SCHEMA = messageTypeBuilder.named(TABLE_NAME);
}
public CreateParquetFile (String outFilePath){
try {
File f = new File(outFilePath);
if (f.exists()) {
f.delete();
}
m_writer = getWriter(outFilePath);
} catch (IOException e) {
e.printStackTrace();
}
}
/*获取写入操作对象*/
private static ParquetWriter<Group> getWriter(String filePath) throws IOException {
Path path = new Path(filePath);
return ExampleParquetWriter.builder(path)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(new Configuration())
.withType(FILE_SCHEMA).build();
}
/*生成ParquetFile*/
private void writeToParquetFile(String sfile){
try {
FileObject oRet = new FileObject(sfile);
String exp = String.format(";,\",\"");
IParam param = ParamParser.parse(exp, null, null);
ICursor c = CreateCursor.createCursor("", oRet, null,param,"", new Context());
Sequence tab = null;
long total = 0;
while(null!=(tab = (Sequence) c.fetch(10000)) ) {
total += tab.length();
writeToFile(tab);
if (total %500000==0) {
System.out.println("idx = "+total);
}
if (tab.length()<10000) {
break;
}
}
m_writer.close();
}catch(Exception e) {
System.out.println("aaa: "+e);
}
}
public static int dateToInt(Date date) {
long milliseconds = date.getTime();
return (int) (milliseconds / 1000);
}
/*数据写入*/
private void writeToFile(Sequence tab) throws IOException {
SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(FILE_SCHEMA);
Group group = null;
for(int i=0;i<tab.length(); i++)
{
BaseRecord sq = (BaseRecord)tab.get(i+1);
try {
group = simpleGroupFactory.newGroup();
group.append("pid", Long.parseLong(sq.getFieldValue(0).toString()) );
group.append("store", (String)sq.getFieldValue(1));
group.append("product", (Integer)sq.getFieldValue(2));
Date d = (Date)sq.getFieldValue(3);
group.append("indate", dateToDays(d));
group.append("num", Long.parseLong(sq.getFieldValue(4).toString()) );
m_writer.write(group);
}catch(Exception e) {
System.out.println("writeToFile: "+e);
}
}
}
public static int dateToDays(Date date) {
int days = 0;
ZoneId zoneId = ZoneId.systemDefault();
int offsetSeconds = zoneId.getRules().getOffset(Instant
.now()).getTotalSeconds();
long zonems = offsetSeconds * 1000;
long milliseconds = date.getTime()+zonems;
days = (int)(milliseconds / (1000 * 60 * 60 * 24));
return days;
}
public static void main(String[] args) throws IOException {
String srcFile = "H:/tmp/data3/inbound.txt";
/*获取文件写入的目录*/
String destFile= "h:/tmp/data3/out_inbound.parquet";
CreateParquetFile cls = new CreateParquetFile(destFile);
cls.writeToParquetFile(srcFile);
System.out.println("OK.");
}
}
将文本数据转换成 Parquet 文件数据,生成的文件大小为 1157MB。
4、读取数据
将读取的文件数据记录,按字段数据类型进行相应的转换,并存放到 Object[] 数组中,按此方式对 esProc 组表,ORC,Parquet 三种文件遍历读取用时测试。
4.1 esProc 组表
读取数据 SPL 脚本:
A | B | |
1 | =now() | |
2 | =file("H:/tmp/data3/inbound.ctx").open() | |
3 | =A2.cursor() | |
4 | for A3, 10000 | >B1=B1+A4.len() |
5 | =A2.close() | |
6 | >output("total=" / B1) | |
7 | =interval@ms(A1,now()) |
利用游标遍历读取组表数据,用时 11.053 秒。
4.2 ORC
利用 java 程序读取:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import java.time.LocalDate;
public class InboundReader {
public static void main(String[] args) throws Exception {
try {
long start = System.currentTimeMillis();
String sfile = "H:/tmp/data3/out_inbound.orc";
Reader reader = OrcFile.createReader(new Path(sfile), OrcFile.readerOptions(conf));
TypeDescription readSchema = reader.getSchema();
System.out.println("Row count: " + reader.getNumberOfRows());
VectorizedRowBatch batch = readSchema.createRowBatch(50000);
RecordReader rowIterator = reader.rows(reader.options().schema(readSchema));
LongColumnVector pid = (LongColumnVector) batch.cols[0];
BytesColumnVector store = (BytesColumnVector) batch.cols[1];
LongColumnVector product = (LongColumnVector) batch.cols[2];
DateColumnVector indate = (DateColumnVector) batch.cols[3];
LongColumnVector total = (LongColumnVector) batch.cols[4];
long sum = 0;
String[] cols = new String[]{"pid","store", "product","indate","num"};
Object[] items = new Object[cols.length];
while (rowIterator.nextBatch(batch)) {
for (int row = 0; row < batch.size; ++row) {
int productRow = product.isRepeating ? 0 : row;
int indateRow = indate.isRepeating ? 0 : row;
int totalRow = total.isRepeating ? 0 : row;
items[0] = pid.vector[row];
items[1] = store.toString(row);
items[2] = (product.noNulls || !product.isNull[productRow] ? product.vector[productRow] : 0);
items[3] = (indate.noNulls || !indate.isNull[indateRow] ? LocalDate.ofEpochDay(indate.vector[indateRow]).toString() : null);
items[4] = (total.noNulls || !total.isNull[totalRow] ? total.vector[totalRow] : 0);
}
sum+=batch.size;
}
rowIterator.close();
reader.close();
System.out.println("sum = "+ sum+"; Time = "+(System.currentTimeMillis() - start));
}catch(Exception e) {
System.out.println(e);
}
}
}
遍历读取 ORC 文件,用时 19.595 秒。
4.3 Parquet
import java.time.LocalDate;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.GroupValueSource;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.GroupType;
import com.scudata.common.Logger;
public class ParquetReaderTest {
public static void main(String[] args) throws Exception {
try {
long nStart = System.currentTimeMillis();
ParquetReaderTest cls = new ParquetReaderTest();
String srcFile = "h:/tmp/data3/out_inbound.parquet";
cls.parquetRead(srcFile);
System.out.println("time = " + (System.currentTimeMillis() - nStart));
} catch (Exception e) {
e.printStackTrace();
}
}
private void parquetRead(String srcFile) throws Exception {
long sum = 0;
Map<String, String> map = new LinkedHashMap<>();
map.put("pid", "long");
map.put("store", "string");
map.put("product", "int");
map.put("indate", "date");
map.put("num", "long");
GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader<Group> reader = new ParquetReader<Group>(new Path(srcFile), readSupport);
Group line = null;
Object[] items = null;
while ((line = reader.read()) != null) {
items = readGroup(line, map.values());
sum++;
if (sum % 5000000 == 0) {
System.out.println("idx = " + sum);
}
}
reader.close();
System.out.println("sum=" + sum);
}
private Object[] readGroup(Group g, Collection<String> colsType) throws Exception {
Object[] items = new Object[colsType.size()];
try {
String sType = null;
Iterator<String> iter = colsType.iterator();
int n = 0;
while (iter.hasNext()) {
sType = iter.next();
Type colType = g.getType().getFields().get(n);
if (colType.isPrimitive()) {
if (sType.equals("string")) {
items[n] = g.getString(n, 0);
} else if (sType.equals("int")) {
items[n] = g.getInteger(n, 0);
} else if (sType.equals("long")) {
items[n] = g.getLong(n, 0);
} else if (sType.equals("double")) {
items[n] = g.getDouble(n, 0);
} else if (sType.equals("float")) {
items[n] = g.getFloat(n, 0);
} else if (sType.equals("boolean")) {
items[n] = g.getBoolean(n, 0);
} else if (sType.equals("int96")) {
items[n] = g.getInt96(n, 0);
} else if (sType.equals("date")) {
items[n] = LocalDate.ofEpochDay(g.getInteger(n, 0)).toString();
}
n++;
} else {
GroupValueSource subG = g.getGroup(n, 0);
GroupType pt = subG.getType();
int colSize = pt.getFieldCount();
System.out.println("subColSize = " + colSize);
}
}
} catch (Exception e) {
Logger.error("readGroup:: " + e);
}
return items;
}
}
遍历读取 Parquet 文件,用时 85.370 秒。测试发现若 date 类型以 int96 二进制方式存储,转换成 date 更慢。
5、汇总与总结
组表 | ORC | Parquet | TXT | |
压缩格式 | lz4 | snappy | snappy | 无 |
文件大小 (MB) | 608 | 513.9 | 1157 | 8448 |
第 1 次用时 (秒) | 15.519 | 28.517 | 93.972 | |
第 2 次用时 (秒) | 10.841 | 18.034 | 76.114 | |
第 3 次用时 (秒) | 11.053 | 19.595 | 85.370 |
从表中可以看出,相对于无压缩的文本数据,这三种列式存储格式都有不错的数据压缩比,大大节约了数据存储空间。其中 ORC 的压缩率最高,esProc 组表略低一点但相差不大(约 20%),Parquet 的压缩率明显要差很多,已经超过 ORC 的两倍了。
读取速度方面,则 esProc 组表最快,几乎比 ORC 快了一倍,更是远远超过了 Parquet。这从另一个侧面可以说明,Parquet 可能已经是过时的文件格式了,正在被 ORC 取代。
综合压缩率和读取性能两方面来看,esProc 组表的整体优势更大。如果再考虑到 esProc 组表上的灵活分段及有序定位等功能,其用于大数据计算的性能优势将会更明显。
英文版