专栏首页华章科技以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)

以5个数据库为例,用Python实现数据的提取、转换和加载(ETL)

导读:每个数据科学专业人员都必须从不同的数据源中提取、转换和加载(Extract-Transform-Load,ETL)数据。

本文将讨论如何使用Python为选定的流行数据库实现数据的ETL。对于关系数据库,选择MySQL,并将Elasticsearch作为文档数据库的例子展开。对于图形数据库,选择Neo4j。对于NoSQL,可参考此前文章中介绍的MongoDB

作者:萨扬·穆霍帕迪亚(Sayan Mukhopadhyay)

如需转载请联系大数据(ID:hzdashuju)

  • ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。
  • Neo4j是一个高性能的,NOSQL图形数据库,它将结构化数据存储在网络上(从数学角度叫做图)而不是表中,是一个嵌入式的、基于磁盘的、具备完全的事务特性的Java持久化引擎。

01 MySQL

MySQLdb是在MySQL C接口上面开发的Python API。

1. 如何安装MySQLdb

首先,需要在计算机上安装Python MySQLdb模块。然后运行以下脚本:

#!/usr/bin/python
import MySQLdb

如果出现导入错误,则表示模块未正确安装。

以下是安装MySQL Python模块的说明:

$gunzip MySQL-python-1.2.2.tar.gz
$tar –xvf MySQL-python-1.2.2.tar
$cd MySQL-python-1.2.2
$python setup.py build
$python setup.py install

2. 数据库连接

在连接到MySQL数据库之前,请确保有以下内容。

  • 有一个名为TEST的数据库。
  • 在TEST数据库中有一个表STUDENT。
  • STUDENT表有三个字段:NAME、SUR_NAME和ROLL_NO。
  • 用户对TEST数据库具有完全访问权限。

3. INSERT操作

以下代码执行SQL INSERT语句,以便在STUDENT表中创建记录:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = """INSERT INTO STUDENT(NAME,
         SUR_NAME, ROLL_NO)
         VALUES ('Sayan', 'Mukhopadhyay', 1)"""
try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
   db.commit()
except:
   # Rollback in case there is any error
   db.rollback()
# disconnect from server
db.close()

4. READ操作

以下代码从STUDENT表中提取数据并打印出来:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = "SELECT * FROM STUDENT "
try:
   # Execute the SQL command
cursor.execute(sql)
   # Fetch all the rows in a list of lists.
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
id = row[2]
      # Now print fetched result
print "name=%s,surname=%s,id=%d" % \
             (fname, lname, id )
except:
print "Error: unable to fecth data"
# disconnect from server
db.close()

5. DELETE操作

以下代码从TEST中删除id=1的一行数据:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","test","passwd","TEST")

#prepare a cursor object using cursor() method
cursor = db.cursor()

# PrepareSQL query to DELETE required records
sql="DELETE FROM STUDENT WHERE ROLL_NO=1"
try:
#Execute the SQL command 
cursor.execute(sql)
#Commit your changes in the database
db.commit()
except:
#Roll back in case there is any error
db.rollback()

#disconnect from server 
db.close()

6. UPDATE操作

以下代码将lastname为Mukhopadhyay的记录更改为Mukherjee:

#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","user","passwd","TEST" )
# prepare a cursor object using
cursor() method cursor = db.cursor()
# Prepare SQL query to UPDATE required records
sql = "UPDATE STUDENT SET SUR_NAME="Mukherjee"
                          WHERE SUR_NAME="Mukhopadhyay""
try:
   # Execute the SQL command
cursor.execute(sql)
   # Commit your changes in the database
db.commit()
except:
   # Rollback in case there is any error
db.rollback()
# disconnect from server
db.close()

7. COMMIT操作

提交操作提供对数据库完成修改命令,并且在此操作之后,无法将其还原。

8. ROLL-BACK操作

如果不能确认对数据的修改同时想要撤回操作,可以使用roll-back()方法。

以下是通过Python访问MySQL数据的完整示例。它将提供将数据存储为CSV文件或MySQL数据库中的数据的完整描述。

import MySQLdb
import sys

out = open('Config1.txt','w')
print "Enter the Data Source Type:"
print "1. MySql"
print "2. Text"
print "3. Exit"

while(1):
       data1 = sys.stdin.readline().strip()
       if(int(data1) == 1):
             out.write("source begin"+"\n"+"type=mysql\n")
             print "Enter the ip:"
             ip = sys.stdin.readline().strip()
             out.write("host=" + ip + "\n")
             print "Enter the database name:"
             db = sys.stdin.readline().strip()
             out.write("database=" + db + "\n")
             print "Enter the user name:"
             usr = sys.stdin.readline().strip()
             out.write("user=" + usr + "\n")
             print "Enter the password:"
             passwd = sys.stdin.readline().strip()
             out.write("password=" + passwd + "\n")
             connection = MySQLdb.connect(ip, usr, passwd, db)
             cursor = connection.cursor()
             query = "show tables"
             cursor.execute(query)
             data = cursor.fetchall()
             tables = []
             for row in data:
                    for field in row:
                           tables.append(field.strip())
             for i in range(len(tables)):
                    print i, tables[i]
             tb = tables[int(sys.stdin.readline().strip())]
             out.write("table=" + tb + "\n")
             query = "describe " + tb
             cursor.execute(query)
             data = cursor.fetchall()
             columns = []
             for row in data:
                    columns.append(row[0].strip())
             for i in range(len(columns)):
                    print columns[i] 
             print "Not index choose the exact column names seperated by coma"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")

             cursor.close()
             connection.close()
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 2):
             print "path of text file:"
             path = sys.stdin.readline().strip()
             file = open(path)
             count = 0
             for line in file:
                    print line
                    count = count + 1
                    if count > 3:
                          break
             file.close()
             out.write("source begin"+"\n"+"type=text\n")
             out.write("path=" + path + "\n")
             print "enter delimeter:"
             dlm = sys.stdin.readline().strip()
             out.write("dlm=" + dlm + "\n")
             print "enter column indexes seperated by comma:"
             cols = sys.stdin.readline().strip()
             out.write("columns=" + cols + "\n")
             out.write("source end"+"\n")

             print "Enter the Data Source Type:"
             print "1. MySql"
             print "2. Text"
             print "3. Exit"

       if(int(data1) == 3):
             out.close()
             sys.exit()

02 Elasticsearch

Elasticsearch(ES)低级客户端提供从Python到ES REST端点的直接映射。Elasticsearch的一大优势是为数据分析提供了全栈解决方案。Elasticsearch作为数据库,有可配置前端Kibana、数据收集工具Logstash以及企业安全工具Shield。

下例具有称为cat、cluster、indices、ingest、nodes、snapshot和tasks的特征,根据任务分别转换为CatClient、ClusterClient、IndicesClient、IngestClient、NodesClient、SnapshotClient和TasksClient实例。这些实例是访问这些类及其方法的唯一方式。

你可以指定自己的连接类,可以通过提供的connection_class参数来使用。

# create connection to local host using the ThriftConnection
Es1=Elasticsearch(connection_class=ThriftConnection)

如果你想打开sniffing,那么有几个选择:

# create connection that will automatically inspect the cluster to get
# the list of active nodes. Start with nodes running on 'esnode1' and
# 'esnode2'
Es1=Elasticsearch(
    ['esnode1', 'esnode2'],
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 30 seconds
sniffer_timeout=30
)

不同的主机可以有不同的参数,你可以为每个节点使用一个字典来指定它们。

# connect to localhost directly and another node using SSL on port 443
# and an url_prefix. Note that ``port`` needs to be an int.
Es1=Elasticsearch([
{'host':'localhost'},
{'host':'othernode','port':443,'url_prefix':'es','use_ssl':True},
])

还支持SSL客户端身份验证(有关选项的详细说明,请参阅Urllib3HttpConnection)。

Es1=Elasticsearch(
['localhost:443','other_host:443'],
# turn on SSL
use_ssl=True,
# make sure we verify SSL certificates (off by default)
verify_certs=True,
# provide a path to CA certs on disk
ca_certs='path to CA_certs',
# PEM formatted SSL client certificate
client_cert='path to clientcert.pem',
# PEM formatted SSL client key
client_key='path to clientkey.pem'
)
  • 连接层API

许多类负责处理Elasticsearch集群。这里可以通过将参数传递给Elasticsearch类来忽略正在使用的默认子类。属于客户端的每个参数都将添加到Transport、ConnectionPool和Connection上。

例如,如果你要使用定制的ConnectionSelector类,只需传入selector_class参数即可。

整个API以很高的精确度包装了原始REST API,其中包括区分调用必需参数和可选参数。这意味着代码区分了按排位的参数和关键字参数。建议读者使用关键字参数来保证所有调用的一致性和安全性。

如果Elasticsearch返回2XX,则API调用成功(并将返回响应)。否则,将引发TransportError(或更具体的子类)的实例。你可以在异常中查看其他异常和错误状态。如果你不希望引发异常,可以通过传入ignore参数忽略状态代码或状态代码列表。

from elasticsearch import Elasticsearch
es=Elasticsearch()
# ignore 400 cause by IndexAlreadyExistsException when creating an index
es.indices.create(index='test-index',ignore=400)
# ignore 404 and 400
es.indices.delete(index='test-index',ignore=[400,404])

03 Neo4j Python驱动

Neo4j支持Neo4j Python驱动,并通过二进制协议与数据库连接。它试图保持简约及Python的惯用方式。

pip install neo4j-driver
from neo4j.v1 import GraphDatabase, basic_auth
driver11 = GraphDatabase.driver("bolt://localhost", auth=basic_auth("neo4j", "neo4j"))
session11 = driver11.session()
session11.run("CREATE (a:Person {name:'Sayan',title:'Mukhopadhyay'})")
result11= session11.run("MATCH (a:Person) WHERE a.name ='Sayan' RETURN a.name AS name, a.title AS title")
for recordi n result11:
print("%s %s"% (record["title"], record["name"]))
session11.close()

04 neo4j-rest-client

neo4j-rest-client的主要目标是确保已经使用本地Neo4j的Python程序员通过python-embedded的方式也能够访问Neo4j REST服务器。因此,neo4j-rest-client API的结构与python-embedded完全同步。但是引入了一种新的结构,以达到更加Python化的风格,并通过Neo4j团队引入的新特性来增强API。

05 内存数据库

另一个重要的数据库类是内存数据库。它在RAM中存储和处理数据。因此,对数据库的操作非常快,并且数据是灵活的。SQLite是内存数据库的一个流行范例。在Python中,需要使用sqlalchemy库来操作SQLite。在第1章的Flask和Falcon示例中,展示了如何从SQLite中选择数据。以下将展示如何在SQLite中存储Pandas数据框架:

from sqlalchemy import create_engine
import sqlite3
conn = sqlite3.connect('multiplier.db')
conn.execute('''CREATE TABLE if not exists multiplier
       (domain        CHAR(50),
        low        REAL,
        high        REAL);''')
conn.close()
db_name = "sqlite:///" + prop + "_" + domain + str(i) + ".db"
disk_engine = create_engine(db_name)
df.to_sql('scores', disk_engine, if_exists='replace')

06 Python版本MongoDB

这部分内容请见此前的文章数据处理入门干货:MongoDB和pandas极简教程

关于作者:Sayan Mukhopadhyay拥有超过13年的行业经验,并与瑞信、PayPal、CA Technologies、CSC和Mphasis等公司建立了联系。他对投资银行、在线支付、在线广告、IT架构和零售等领域的数据分析应用有着深刻的理解。他的专业领域是在分布式和数据驱动的环境(如实时分析、高频交易等)中,实现高性能计算。

本文摘编自《Python高级数据分析:机器学习、深度学习和NLP实例》,经出版方授权发布。

本文分享自微信公众号 - 大数据(hzdashuju),作者:萨扬·穆霍帕迪亚

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 干货:可视化项目实战经验分享,轻松玩转Bokeh(建议收藏)

    导读:本文通过一个项目案例,详细的介绍了如何从 Bokeh 基础到构建 Bokeh 交互式应用程序的过程,内容循序渐进且具有很高的实用性。本文共有两万字左右,属...

    华章科技
  • 干货 :数据挖掘中易犯的11大错误

    5. 使用了未来的信息(Accept Leaks from the Future)

    华章科技
  • 从大数据中挖掘什么?

    大数据挖掘中最重要的是决定挖掘什么样的知识,这是在数据的收集、处理、挖掘的整个过程中都需要认真考虑的问题。本文首先提出大数据挖掘的几项策略,即尽量设想挖掘的场景...

    华章科技
  • POJ-1276-Cash Machine(多重背包)

    Cash Machine Time Limit: 1000MS Memory Limit: 10000K Total Submissions:...

    ShenduCC
  • CodeForces 157A Game Outcome

    A. Game Outcome time limit per test 2 seconds memory limit per test 256 me...

    ShenduCC
  • 参数化与人工智能,从计算机辅助到计算机决策,同济大学DigitalFuture演讲记录

    这是他在同济大学DigitalFuture演讲稿,为我们介绍了人工智能在建筑领域的应用。欢迎大家关注他的公众号(见文末)

    mixlab
  • 数据仓库专题(23):总线矩阵的另类应用-Drill Down into a More Detailed Bus Matrix

    Many of you are already familiar with the data warehouse bus architecture and m...

    数据饕餮
  • hdu 2473 Junk-Mail Filter (并查集之点的删除)

    Junk-Mail Filter Time Limit: 15000/8000 MS (Java/Others)    Memory Limit: 32768/...

    Gxjun
  • US oil prices turn negative as demand dries up

    The price of US oil has turned negative for the first time in history.

    仇诺伊
  • 超弱局部间断伽辽金法研究具有高阶空间导数的偏微分方程(CS NA)

    本文提出了一种新的不连续伽辽金方法,用于求解具有高阶空间导数的几类偏微分方程。将局部间断伽辽金法和超弱间断伽辽金法的优点结合起来。首先将具有高阶空间导数的偏微分...

    非过度曝光

扫码关注云+社区

领取腾讯云代金券