Embulk是一款开源的批处理框架,它主要用于异构数据库,文件存储以及云服务之间的数据传输工具。特色:
Embulk使用Yaml进行配置,主要包括下面几个section:
in:从数据源读取数据数据(基于文件(ftp等)和基于记录(数据库等))
parser:如果数据源是文件,parser解析文件格式(基于文件)
decoder:用来解压缩和加解密数据(基于文件)
out:输出数据到目标数据源
formatter:将数据输出成相应文件格式(基于文件)
encoder:压缩数据或加解密数据(基于数据)
filters:输出后的数据过滤(可选)
exec:执行引擎(可选)
官方网站模板如下:
in:
type: file
path_prefix: ./mydata/csv/
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: '"'
null_string: 'NULL'
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
filters:
- type: speedometer
speed_limit: 250000
out:
type: stdout
每个插件的具体的模板可以参考: http://www.embulk.org/plugins/
安装:
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
下载elasticsearch的输出插件,file的插件在安装的时候自带。
embulk gem install embulk-output-elasticsearch
创建一个简单的yml文件:seed.yml
in:
type: file
path_prefix: ./mydata/csv/
out:
type: elasticsearch
index: embulk
index_type: embulk
nodes:
- host: localhost
随后embulk会自动猜测你需要的Yaml文件:
embulk guess ./mydata/seed.yml -o config.yml
如下:
in:
type: file
path_prefix: ./mydata/csv/
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
null_string: 'NULL'
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out:
type: elasticsearch
index: embulk
index_type: embulk
nodes:
- {host: localhost}
你看Embulk帮你补全了大部分,并在默认目录下生成了config.yml 如果你的文件中牵扯到时区的话,可以加上:
parser:
default_timezone: 'Asia/Tokyo'
之后就可以执行yml文件:
embulk run config.yml -c diff.yml
diff.yml记录了上次最新跑过的数据,下一次会自动更新,并不会再运行记录在diff.yml文件里的文件
in: {last_path: mydata/csv/sample_01.csv.gz}
out: {}
当数据中途因为各种原因断了的时候,Embulk支持重跑,只需要运行时加上resume-state.yml的生成路径
embulk run config.yml -r resume-state.yml
事务失败后,再次运行:
embulk run config.yml -r resume-state.yml
Embulk会根据resume-state.yml记录的状态重跑数据 如果不再需要事务支持,只需要删除即可。
embulk cleanup config.yml -r resume-state.yml