Apache Calcite是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。
Calcite 之前的名称叫做optiq,optiq 起初在 Hive 项目中,为 Hive 提供基于成本模型的优化,即CBO(Cost Based Optimizatio)。2014 年 5 月 optiq 独立出来,成为 Apache 社区的孵化项目,2014 年 9 月正式更名为 Calcite。
Calcite 的目标是“one size fits all(一种方案适应所有需求场景)”,希望能为不同计算平台和数据源提供统一的查询引擎。
一般来说Calcite解析SQL有以下几步:
INSERT INTO tmp_node
SELECT s1.id1, s1.id2, s2.val1
FROM source1 as s1 INNER JOIN source2 AS s2
ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;
LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
LogicalFilter(condition=[AND(>($2, 5), =($8, 3))])
LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER])
LogicalTableScan(table=[[SOURCE1]])
LogicalTableScan(table=[[SOURCE2]])
谓词下推,投影下推,关系代数定律优化
LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])
LogicalFilter(condition=[=($4, 3)])
LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
LogicalTableScan(table=[[SOURCE1]])
LogicalFilter(condition=[>($3,5)])
LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
LogicalTableScan(table=[[SOURCE2]])
如上,节点树中的最后节点均为LogicalTableScan,假设我们不参与(LogicalTableScan)Calcite的查询过程,即不做SQL解析,不做优化,只要把它接入进来,实际Calcite是可以工作的,无非就是可能会有扫全表、数据全部加载到内存里等问题,所以实际中我们可能会参与全部(Translatable)或部分工作(FilterableTable),覆盖Calcite的一些执行计划或过滤条件,让它能更高效的工作。
值得一提的是,Calcite支持异构数据源查询,比如数据存在es和mysql,可以通过写sql join之类的操作,让calcite分别先从不同的数据源查询数据,然后再在内存里进行合并计算;另外,它本身提供了许多优化规则,也支持我们自定义优化规则,来优化整个查询。
a simple implementation of Table, using the ScannableTable interface, that enumerates all rows directly
这种方式基本不会用,原因是查询数据库的时候没有任何条件限制,默认会先把全部数据拉到内存,然后再根据filter条件在内存中过滤。
使用方式:实现Enumerable scan(DataContext root);
,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据。
a more advanced implementation that implements FilterableTable, and can filter out rows according to simple predicates
初级用法,我们能拿到filter条件,即能再查询底层DB时进行一部分的数据过滤,一般开始介入calcite可以用这种方式(translatable方式学习成本较高)。
使用方式:实现Enumerable scan(DataContext root, List filters )
。
如果当前类型的“表”能够支持我们自己写代码优化这个过滤器,那么执行完自定义优化器,可以把该过滤条件从集合中移除,否则,就让calcite来过滤,简言之就是,如果我们不处理List filters
,Calcite也会根据自己的规则在内存中过滤,无非就是对于查询引擎来说查的数据多了,但如果我们可以写查询引擎支持的过滤器(比如写一些hbase、es的filter),这样在查的时候引擎本身就能先过滤掉多余数据,更加优化。提示,即使走了我们的查询过滤条件,可以再让calcite帮我们过滤一次,比较灵活。
advanced implementation of Table, using TranslatableTable, that translates to relational operators using planner rules.
高阶用法,有些查询用上面的方式都支持不了或支持的不好,比如join、聚合、或对于select的字段筛选等,需要用这种方式来支持,好处是可以支持更全的功能,代价是所有的解析都要自己写,“承上启下”,上面解析sql的各个部件,下面要根据不同的DB(esmysqldrudi..)来写不同的语法查询。
当使用ScannableTable的时候,我们只需要实现函数Enumerable scan(DataContext root);
,该函数返回Enumerable对象,通过该对象可以一行行的获取这个Table的全部数据(也就意味着每次的查询都是扫描这个表的数据,我们干涉不了任何执行过程);当使用FilterableTable的时候,我们需要实现函数Enumerable scan(DataContext root, List filters );
参数中多了filters数组,这个数据包含了针对这个表的过滤条件,这样我们根据过滤条件只返回过滤之后的行,减少上层进行其它运算的数据集;当使用TranslatableTable的时候,我们需要实现RelNode toRel( RelOptTable.ToRelContext context, RelOptTable relOptTable);
,该函数可以让我们根据上下文自己定义表扫描的物理执行计划,至于为什么不在返回一个Enumerable对象了,因为上面两种其实使用的是默认的执行计划,转换成EnumerableTableAccessRel算子,通过TranslatableTable我们可以实现自定义的算子,以及执行一些其他的rule,Kylin就是使用这个类型的Table实现查询。
如果你的数据源不在官方的支持列表中,或者官方的支持不能满足你的需求,那么则需要自己实现源接入。
<!--calcite核心包-->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.19.0</version>
</dependency>
<!--项目用-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
<!--项目用-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>
calcite中,引入一个数据库通常是通过注册一个SchemaFactory
接口实现类来实现。SchemaFactory
中只有一个方法,就是生成Schema
。Schema
最重要的功能是获取所有Table
。Table
有两个功能,一个是获取所有字段的类型,另一个是得到Enumerable
迭代器用来读取数据。
如果将你的数据源引入calcite,一般情况下是使用一个配置文件,以下是配置文件的demo。
{
"version": "1.0",
"defaultSchema": "TEST",
"schemas": [
{
"name": "TEST",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8",
"jdbcDriver":"com.mysql.cj.jdbc.Driver",
"jdbcUser":"test",
"jdbcPassword":"test"
}
}
]
}
这里我们先生成一个CSV文件,后边的操作就是通过在calcite中调用SQL访问CSV中的数据。
TEST01.csv
ID:VARCHAR,NAME1:VARCHAR,NAME2:VARCHAR
0,first,second
1,hello,world
CsvSchemaFactory类
package com.calcite.csv;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import java.util.Map;
public class CsvSchemaFactory implements SchemaFactory {
/**
* parentSchema 他的父节点,一般为root
* name 数据库的名字,它在model中定义的
* operand 也是在mode中定义的,是Map类型,用于传入自定义参数。
* */
@Override
public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
return new CsvSchema(String.valueOf(operand.get("dataFile")));
}
}
CsvSchema类
package com.calcite.csv;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.Source;
import org.apache.calcite.util.Sources;
import java.net.URL;
import java.util.Map;
public class CsvSchema extends AbstractSchema {
private Map<String, Table> tableMap;
private String dataFile;
public CsvSchema(String dataFile) {
this.dataFile = dataFile;
}
@Override
protected Map<String, Table> getTableMap() {
URL url = Resources.getResource(dataFile);
Source source = Sources.of(url);
if (tableMap == null) {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
builder.put(this.dataFile.split("\\.")[0],new CsvTable(source));
// 一个数据库有多个表名,这里初始化,大小写要注意了,TEST01是表名。
tableMap = builder.build();
}
return tableMap;
}
}
CsvTable类
package com.calcite.csv;
import com.google.common.collect.Lists;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;
import java.io.*;
import java.util.List;
public class CsvTable extends AbstractTable implements ScannableTable {
private Source source;
public CsvTable(Source source) {
this.source = source;
}
/**
* 获取字段类型
*/
@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;
List<String> names = Lists.newLinkedList();
List<RelDataType> types = Lists.newLinkedList();
try {
BufferedReader reader = new BufferedReader(new FileReader(source.file()));
String line = reader.readLine();
List<String> lines = Lists.newArrayList(line.split(","));
lines.forEach(column -> {
String name = column.split(":")[0];
String type = column.split(":")[1];
names.add(name);
types.add(typeFactory.createSqlType(SqlTypeName.get(type)));
});
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return typeFactory.createStructType(Pair.zip(names, types));
}
@Override
public Enumerable<Object[]> scan(DataContext dataContext) {
return new AbstractEnumerable<Object[]>() {
@Override
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source);
}
};
}
}
CsvEnumerator类
package com.calcite.csv;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.util.Source;
import java.io.BufferedReader;
import java.io.IOException;
public class CsvEnumerator <E> implements Enumerator<E> {
private E current;
private BufferedReader br;
public CsvEnumerator(Source source) {
try {
this.br = new BufferedReader(source.reader());
this.br.readLine();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public E current() {
return current;
}
@Override
public boolean moveNext() {
try {
String line = br.readLine();
if(line == null){
return false;
}
current = (E)line.split(","); // 如果是多列,这里要多个值
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 出现异常走这里
* */
@Override
public void reset() {
System.out.println("报错了兄弟,不支持此操作");
}
/**
* InputStream流在这里关闭
* */
@Override
public void close() {
}
}
model.json
{
"version": "1.0",
"defaultSchema": "TEST_CSV",
"schemas": [
{
"name": "TEST_CSV",
"type": "custom",
"factory": "com.calcite.csv.CsvSchemaFactory",
"operand": {
"dataFile": "TEST01.csv"
}
}
]
}
Main方法调用
package com.calcite;
import com.alibaba.fastjson.JSON;
import com.calcite.util.ReourceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.sql.*;
import java.util.List;
import java.util.Map;
public class Client {
/**
* 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
* 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
* operand 动态参数,ScheamFactory的create方法会接收到这里的数据
*/
public static void main(String[] args) {
try {
// 用文件的方式
//URL url = Client.class.getResource("/model.json");
//String str = URLDecoder.decode(url.toString(), "UTF-8");
//Properties info = new Properties();
//info.put("model", str.replace("file:", ""));
//Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// 字符串方式
String model = ReourceUtil.getResourceAsString("model.json");
Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
Statement statement = connection.createStatement();
test1(statement);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* CSV文件读取
* @param statement
* @throws Exception
*/
public static void test1(Statement statement) throws Exception {
ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
System.out.println(JSON.toJSONString(getData(resultSet)));
}
public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
List<Map<String,Object>> list = Lists.newArrayList();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnSize = metaData.getColumnCount();
while (resultSet.next()) {
Map<String, Object> map = Maps.newLinkedHashMap();
for (int i = 1; i < columnSize + 1; i++) {
map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
}
list.add(map);
}
return list;
}
}
在4.2的演示中,我们能够使用SQL查询CSV文件中的数据。接下来,我们再定义一种内存数据源,主要作用是演示两种数据源间的关联查询。
MemSchemaFactory类
package com.calcite.memory;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import java.util.Map;
public class MemSchemaFactory implements SchemaFactory {
@Override
public Schema create(SchemaPlus schemaPlus, String s, Map<String, Object> map) {
return new MemSchema(map);
}
}
MemSchema类
package com.calcite.memory;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.Map;
public class MemSchema extends AbstractSchema {
private Map<String, Object> map;
private Map<String, Table> tableMap;
public MemSchema(Map<String, Object> map) {
this.map = map;
}
@Override
protected Map<String, Table> getTableMap() {
if (tableMap == null) {
final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
map.forEach((key, value) -> {
builder.put(key, new MemTable(value));
});
tableMap = builder.build();
}
return tableMap;
}
}
MemTable类
package com.calcite.memory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.google.common.collect.Lists;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class MemTable extends AbstractTable implements ScannableTable {
private List<Map<String, Object>> list = Lists.newLinkedList();
public MemTable(Object list) {
if (list instanceof List) {
((List)list).forEach(o -> {
this.list.add(
JSON.parseObject(JSON.toJSONString(o),
new TypeReference<Map<String, Object>>() {},
Feature.OrderedField));
});
}
}
@Override
public Enumerable<Object[]> scan(DataContext dataContext) {
return new AbstractEnumerable<Object[]>() {
@Override
public Enumerator<Object[]> enumerator() {
return new MemEnumerator<Object[]>(list);
}
};
}
@Override
public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
JavaTypeFactory typeFactory = (JavaTypeFactory)relDataTypeFactory;
List<String> names = Lists.newLinkedList();
List<RelDataType> types = Lists.newLinkedList();
if (list.size() != 0) {
list.get(0).forEach((key, value) -> {
names.add(key);
types.add(typeFactory.createSqlType(SqlTypeName.get("VARCHAR")));
});
}
return typeFactory.createStructType(Pair.zip(names, types));
}
}
MemEnumerator类
package com.calcite.memory;
import com.google.common.collect.Lists;
import org.apache.calcite.linq4j.Enumerator;
import java.util.List;
import java.util.Map;
public class MemEnumerator<E> implements Enumerator<E> {
private List<Map<String, Object>> list = Lists.newLinkedList();
private int index = -1;
private E e;
public MemEnumerator(List<Map<String, Object>> list) {
this.list = list;
}
@Override
public E current() {
return e;
}
@Override
public boolean moveNext() {
if (index+1 >= list.size()){
return false;
}else {
e = (E)list.get(index+1).values().toArray();
index++;
return true;
}
}
@Override
public void reset() {
index = -1;
e = null;
}
@Override
public void close() {
}
}
model.json
{
"version": "1.0",
"defaultSchema": "TEST_CSV",
"schemas": [
{
"name": "TEST_CSV",
"type": "custom",
"factory": "com.calcite.csv.CsvSchemaFactory",
"operand": {
"dataFile": "TEST01.csv"
}
},
{
"name": "TEST_MEM",
"type": "custom",
"factory": "com.calcite.memory.MemSchemaFactory",
"operand": {
"MEM_TABLE_1": [
{
"ID": 0,
"MEM_STR": "str0"
},
{
"ID": 1,
"MEM_STR": "str1"
},
{
"ID": 2,
"MEM_STR": "str2"
}
]
}
}
]
}
Main方法调用
package com.calcite;
import com.alibaba.fastjson.JSON;
import com.calcite.util.ReourceUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.sql.*;
import java.util.List;
import java.util.Map;
public class Client {
/**
* 测试的时候用字符串 defaultSchema 默认数据库 name 数据库名称 type custom factory
* 请求接收类,该类会实例化Schema也就是数据库类,Schema会实例化Table实现类,Table会实例化数据类。
* operand 动态参数,ScheamFactory的create方法会接收到这里的数据
*/
public static void main(String[] args) {
try {
// 用文件的方式
//URL url = Client.class.getResource("/model.json");
//String str = URLDecoder.decode(url.toString(), "UTF-8");
//Properties info = new Properties();
//info.put("model", str.replace("file:", ""));
//Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
// 字符串方式
String model = ReourceUtil.getResourceAsString("model.json");
Connection connection = DriverManager.getConnection("jdbc:calcite:model=inline:" + model);
Statement statement = connection.createStatement();
test2(statement);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* CSV文件读取
* @param statement
* @throws Exception
*/
public static void test1(Statement statement) throws Exception {
ResultSet resultSet = statement.executeQuery("select * from test_csv.TEST01");
System.out.println(JSON.toJSONString(getData(resultSet)));
}
/**
* CSV文件与内存文件关联读取
* @param statement
* @throws Exception
*/
public static void test2(Statement statement) throws Exception {
ResultSet resultSet1 = statement.executeQuery("select csv1.id as cid,csv1.name1 as cname ,mem1.id as mid,mem1.mem_str as mstr from test_csv.TEST01 as csv1 left join test_mem.mem_table_1 as mem1 on csv1.id = mem1.id");
System.out.println(JSON.toJSONString(getData(resultSet1)));
}
public static List<Map<String,Object>> getData(ResultSet resultSet)throws Exception{
List<Map<String,Object>> list = Lists.newArrayList();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnSize = metaData.getColumnCount();
while (resultSet.next()) {
Map<String, Object> map = Maps.newLinkedHashMap();
for (int i = 1; i < columnSize + 1; i++) {
map.put(metaData.getColumnLabel(i), resultSet.getObject(i));
}
list.add(map);
}
return list;
}
}
calcite对于没有高并发
、低延时
的多数据源间数据有着天然的优势。但需要注意的是,如果一个表中数据量特别大,大到读取速度很慢或内存无法容纳,那么务必在操作该表数据时加入尽可能多的筛选条件,如果自定义实现LogicalTableScan
,最好也是实现FilterableTable
,从而减少calcite在内存中操作数据行的量。
参考: