前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ETL (Extract-Transform-Load) with Kiba(4)

ETL (Extract-Transform-Load) with Kiba(4)

作者头像
franket
发布2021-10-18 14:53:32
4670
发布2021-10-18 14:53:32
举报
文章被收录于专栏:技术杂记

因为我们的数据都符合预期,所以没有报出异常,现在故意修改一下源数据

将第二条数据的价格删除,然后再运行ETL脚本

代码语言:javascript
复制
[root@h102 kiba]# vim commandes.csv 
[root@h102 kiba]# cat commandes.csv 
date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;;FA1987
8/3/2015;6,41;FA1988
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
      :date_facture => "7/3/2015",
       :montant_eur => "10,96",
        :amount_eur => 10.96,
      :invoice_date => "2015-03-07",
    :invoice_number => "FA1986"
}
/root/kiba/common.rb:76:in `block in process': Row lacks value for field montant_eur - {:date_facture=>"7/3/2015", :montant_eur=>nil, :numero_commande=>"FA1987"} (RuntimeError)
	from /root/kiba/common.rb:74:in `each'
	from /root/kiba/common.rb:74:in `process'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:35:in `block (3 levels) in process_rows'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:34:in `each'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:34:in `block (2 levels) in process_rows'
	from /root/kiba/common.rb:11:in `block in each'
	from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1748:in `each'
	from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1131:in `block in foreach'
	from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1282:in `open'
	from /usr/local/rvm/rubies/ruby-2.3.0/lib/ruby/2.3.0/csv.rb:1130:in `foreach'
	from /root/kiba/common.rb:10:in `each'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:33:in `block in process_rows'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:32:in `each'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:32:in `process_rows'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/runner.rb:13:in `run'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/lib/kiba/cli.rb:13:in `run'
	from /usr/local/rvm/gems/ruby-2.3.0/gems/kiba-0.6.1/bin/kiba:5:in `<top (required)>'
	from /usr/local/rvm/gems/ruby-2.3.0/bin/kiba:23:in `load'
	from /usr/local/rvm/gems/ruby-2.3.0/bin/kiba:23:in `<main>'
	from /usr/local/rvm/gems/ruby-2.3.0/bin/ruby_executable_hooks:15:in `eval'
	from /usr/local/rvm/gems/ruby-2.3.0/bin/ruby_executable_hooks:15:in `<main>'
[root@h102 kiba]# 

第一条得到成功的处理,但是第二就信息就报错了

报错 Row lacks value for field montant_eur - {:date_facture=>"7/3/2015", :montant_eur=>nil, :numero_commande=>"FA1987"} (RuntimeError)

运行符合预期


定义数据去向

代码语言:javascript
复制
[root@h102 kiba]# vim commandes.csv 
[root@h102 kiba]# cat commandes.csv 
date_facture;montant_eur;numero_commande
7/3/2015;10,96;FA1986
7/3/2015;23,12;FA1987
8/3/2015;6,41;FA1988
[root@h102 kiba]# vim common.rb 
[root@h102 kiba]# cat common.rb 
require 'csv'

class CsvSource
  def initialize(file, options)
    @file = file
    @options = options
  end
  
  def each
    CSV.foreach(@file, @options) do |row|
      yield row.to_hash
    end
  end
end


require 'awesome_print'

def show_me
  transform do |row|
    ap row
    row # always return the row to keep it in the pipeline
  end
end


class ParseFrenchFloat
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = Float(row[@from].gsub(',', '.'))
    row
  end
end


class ParseFrenchDate
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = Date.strptime(row[@from], '%d/%m/%Y').to_s
    row
  end
end


class RenameField
  def initialize(from:, to:)
    @from = from
    @to = to
  end
  
  def process(row)
    row[@to] = row.delete(@from)
    row
  end
end


require 'facets/kernel/blank'

class VerifyFieldsPresence
  def initialize(expected_fields)
    @expected_fields = expected_fields
  end
  
  def process(row)
    @expected_fields.each do |field|
      if row[field].blank?
        raise "Row lacks value for field #{field} - #{row.inspect}"
      end
    end
    row
  end
end


class CsvDestination
  def initialize(file, output_fields)
    @csv = CSV.open(file, 'w')
    @output_fields = output_fields

    @csv << @output_fields
  end
  
  def write(row)
    verify_row!(row)
    @csv << row.values_at(*@output_fields) #*
  end

  def verify_row!(row)
    missing_fields = @output_fields - [row.keys & @output_fields].flatten

    if missing_fields.size > 0
      raise "Row lacks required field(s) #{missing_fields}\n#{row}"
    end
  end

  def close
    @csv.close
  end
end

[root@h102 kiba]# vim convert-csv.etl 
[root@h102 kiba]# cat convert-csv.etl 
require_relative 'common'

# read from source CSV file
source CsvSource, 'commandes.csv', col_sep: ';', headers: true, header_converters: :symbol

#verify the source columns are there and provide a non-blank value
transform VerifyFieldsPresence, [:date_facture, :montant_eur, :numero_commande]

# Parse the numbers
transform ParseFrenchFloat, from: :montant_eur, to: :amount_eur

#Reformat the dates
transform ParseFrenchDate, from: :date_facture, to: :invoice_date

#Rename the remaining column
transform RenameField, from: :numero_commande, to: :invoice_number

#define CSV destination
output_fields = [:invoice_number, :invoice_date, :amount_eur]
destination CsvDestination, 'orders.csv', output_fields

# show details of row contents
show_me
[root@h102 kiba]# bundle exec kiba convert-csv.etl
{
      :date_facture => "7/3/2015",
       :montant_eur => "10,96",
        :amount_eur => 10.96,
      :invoice_date => "2015-03-07",
    :invoice_number => "FA1986"
}
{
      :date_facture => "7/3/2015",
       :montant_eur => "23,12",
        :amount_eur => 23.12,
      :invoice_date => "2015-03-07",
    :invoice_number => "FA1987"
}
{
      :date_facture => "8/3/2015",
       :montant_eur => "6,41",
        :amount_eur => 6.41,
      :invoice_date => "2015-03-08",
    :invoice_number => "FA1988"
}
[root@h102 kiba]# ls
commandes.csv  common.rb  convert-csv.etl  Gemfile  Gemfile.lock  orders.csv
[root@h102 kiba]# cat orders.csv 
invoice_number,invoice_date,amount_eur
FA1986,2015-03-07,10.96
FA1987,2015-03-07,23.12
FA1988,2015-03-08,6.41
[root@h102 kiba]# 

到此,一个简单的基于 CSV 源和目标的 ETL 就实现了,下次有机会再分享一下,如何使用 Mysql 或 Elasticsearch 或 Mongodb 来实现相互之间的 ETL

上面的实例中已经涵盖了 source、transform、process、destination 的定义和应用,其实还有 pre_process 和 post_process 可以定义,它们分别是在 ETL 处理第一行数据之前执行的代码块和 ETL 处理完成最后一行数据之后执行的代码块,详细可以参考 官方文档,有机会再单独分享


命令汇总

  • gem --version
  • mkdir kiba
  • cat Gemfile
  • irb
  • bundle install
  • echo "puts 'Hello from Kiba'" > convert-csv.etl
  • bundle exec kiba convert-csv.etl
  • vim common.rb
  • vim commandes.csv
  • vim convert-csv.etl
  • bundle exec kiba convert-csv.etl
  • vim Gemfile
  • vim commandes.csv
  • cat orders.csv

原文地址

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 定义数据去向
  • 命令汇总
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档