因为我们的数据都符合预期,所以没有报出异常,现在故意修改一下源数据
将第二条数据的价格删除,然后再运行ETL脚本
[root@h102 kiba]# vim commandes.csv
[root@h102 kiba]# cat commandes.csv
date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;;FA1987
8/3/2015;6,41;FA1988
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
:date_facture => "7/3/2015",
:montant_eur => "10,96",
:amount_eur => 10.96,
:invoice_date => "2015-03-07",
:invoice_number => "FA1986"
}
/root/kiba/common.rb:76:in `block in process': Row lacks value for field montant_eur - {:date_facture=>"7/3/2015", :montant_eur=>nil, :numero_commande=>"FA1987"} (RuntimeError)
from /root/kiba/common.rb:74:in `each'
from /root/kiba/common.rb:74:in `process'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:35:in `block (3 levels) in process_rows'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:34:in `each'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:34:in `block (2 levels) in process_rows'
from /root/kiba/common.rb:11:in `block in each'
from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1748:in `each'
from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1131:in `block in foreach'
from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1282:in `open'
from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1130:in `foreach'
from /root/kiba/common.rb:10:in `each'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:33:in `block in process_rows'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:32:in `each'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:32:in `process_rows'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:13:in `run'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/cli.rb:13:in `run'
from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/bin/kiba:5:in `<top (required)>'
from /usr/local/rvm/gems/ruby-2.3.0/bin/kiba:23:in `load'
from /usr/local/rvm/gems/ruby-2.3.0/bin/kiba:23:in `<main>'
from /usr/local/rvm/gems/ruby-2.3.0/bin/ruby_executable_hooks:15:in `eval'
from /usr/local/rvm/gems/ruby-2.3.0/bin/ruby_executable_hooks:15:in `<main>'
[root@h102 kiba]#
第一条得到成功的处理,但是第二就信息就报错了
报错 Row lacks value for field montant_eur - {:date_facture=>"7/3/2015", :montant_eur=>nil, :numero_commande=>"FA1987"} (RuntimeError)
运行符合预期
[root@h102 kiba]# vim commandes.csv
[root@h102 kiba]# cat commandes.csv
date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;23,12;FA1987
8/3/2015;6,41;FA1988
[root@h102 kiba]# vim common.rb
[root@h102 kiba]# cat common.rb
require 'csv'
class CsvSource
def initialize(file, options)
@file = file
@options = options
end
def each
CSV.foreach(@file, @options) do |row|
yield row.to_hash
end
end
end
require 'awesome_print'
def show_me
transform do |row|
ap row
row # always return the row to keep it in the pipeline
end
end
class ParseFrenchFloat
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = Float(row[@from].gsub(',', '.'))
row
end
end
class ParseFrenchDate
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
row
end
end
class RenameField
def initialize(from:, to:)
@from = from
@to = to
end
def process(row)
row[@to] = row.delete(@from)
row
end
end
require 'facets/kernel/blank'
class VerifyFieldsPresence
def initialize(expected_fields)
@expected_fields = expected_fields
end
def process(row)
@expected_fields.each do |field|
if row[field].blank?
raise "Row lacks value for field #{field} - #{row.inspect}"
end
end
row
end
end
class CsvDestination
def initialize(file, output_fields)
@csv = CSV.open(file, 'w')
@output_fields = output_fields
@csv << @output_fields
end
def write(row)
verify_row!(row)
@csv << row.values_at(*@output_fields) #*
end
def verify_row!(row)
missing_fields = @output_fields - [row.keys & @output_fields].flatten
if missing_fields.size > 0
raise "Row lacks required field(s) #{missing_fields}\n#{row}"
end
end
def close
@csv.close
end
end
[root@h102 kiba]# vim convert-csv.etl
[root@h102 kiba]# cat convert-csv.etl
require_relative 'common'
# read from source CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol
#verify the source columns are there and provide a non-blank value
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]
# Parse the numbers
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur
#Reformat the dates
transform ParseFrenchDate, from: :date_facture, to: :invoice_date
#Rename the remaining column
transform RenameField, from: :numero_commande, to: :invoice_number
#define CSV destination
output_fields = [:invoice_number, :invoice_date, :amount_eur]
destination CsvDestination, 'orders.csv', output_fields
# show details of row contents
show_me
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
:date_facture => "7/3/2015",
:montant_eur => "10,96",
:amount_eur => 10.96,
:invoice_date => "2015-03-07",
:invoice_number => "FA1986"
}
{
:date_facture => "7/3/2015",
:montant_eur => "23,12",
:amount_eur => 23.12,
:invoice_date => "2015-03-07",
:invoice_number => "FA1987"
}
{
:date_facture => "8/3/2015",
:montant_eur => "6,41",
:amount_eur => 6.41,
:invoice_date => "2015-03-08",
:invoice_number => "FA1988"
}
[root@h102 kiba]# ls
commandes.csv common.rb convert-csv.etl Gemfile Gemfile.lock orders.csv
[root@h102 kiba]# cat orders.csv
invoice_number,invoice_date,amount_eur
FA1986,2015-03-07,10.96
FA1987,2015-03-07,23.12
FA1988,2015-03-08,6.41
[root@h102 kiba]#
到此,一个简单的基于 CSV 源和目标的 ETL 就实现了,下次有机会再分享一下,如何使用 Mysql 或 Elasticsearch 或 Mongodb 来实现相互之间的 ETL
上面的实例中已经涵盖了 source、transform、process、destination 的定义和应用,其实还有 pre_process 和 post_process 可以定义,它们分别是在 ETL 处理第一行数据之前执行的代码块和 ETL 处理完成最后一行数据之后执行的代码块,详细可以参考 官方文档,有机会再单独分享
gem --version
mkdir kiba
cat Gemfile
irb
bundle install
echo "puts 'Hello from Kiba'" > convert-csv.etl
bundle exec kiba convert-csv.etl
vim common.rb
vim commandes.csv
vim convert-csv.etl
bundle exec kiba convert-csv.etl
vim Gemfile
vim commandes.csv
cat orders.csv
原文地址
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。