数据管理:数据收集、整理、组织、维护、检索等操作过程。 数据存储:应数据管理的需要而产生,存储技术的优劣直接影响数据管理的效率。
数据存储技术的发展分为以下四个阶段:
具体过程如图:
关系型数据库:
非关系型数据库:
谷歌开发的分布式文件系统(GFS):
Hadoop分布式文件系统(HDFS):
**关系型数据库:**采用由图灵奖得主、有关系型数据库之父之称的E.F.Codd于1970年提出。
结构化数据:
结构化查询语言:查询和操作关系数据库的语言(Structual Query Language)简称SQL
关系型数据库的优势:
SQLite:轻型嵌入式开源数据库,广泛应用于IOS和Android移动操作系统 Mysql:前三强中唯一开源数据库,在互联网中占据主导地位 Oracle:闭源,支持大并发,大访问量成熟稳定,安全性高 SQL Server:微软出品,目前最流行的数据库之一,通常和.net搭配使用 PostgreSQL:加州大学伯克利分校开发的,完全由社区驱动的开源项目
由于Mysql免费所以Mysql在当今的企业中占据主导地位。
Mysql数据库:
**MySQL的Python接口创建方法
1、导入Pymysql
Pymsql是在Python3中用于连接MySQL服务器的一个库。
导入方式为:import pymysql
通过以上两个对象,可以将采集的数据库保存到Mysql数据库中。
2、建立连接对象connection
conn = pymysql.connect(host="localhost",user="root",passwd="199712",db="hackdata",charset="utf8")
3、游标对象cursor
cursor = conn.cursor()
4、在cursor对象中执行sql语句 创建表,表明为douban
sql = """CREATE TABLE douban (
id int(5) not null auto_increment primary key,
public_name CHAR(100) NOT NULL ,
book_number int(10),
picture_link char(100)
)"""
cursor.execute(sql)
5、在cursor对象中执行sql语句 在数据表中插入值
insert_douban = "INSERT INTO douban
(public_name,book_number,picture_link) VALUES (%s,%s,%s)"
data_douban = (public_name[i],book_number[i],picture_link[i])
cursor.execute(insert_douban,data_douban)
6、插入完毕后提交事务,并关闭游标和连接对象
conn.commit()
cursor.close()
conn.close()
关系型数据库存储结构化数据,类似Excel二维表,表中4列分别为:
我们在实际使用数据时,可以使用SQL语句从数据库中筛选符合条件的数据。 比如选出在售书目数量为0的出版社信息:
select * from douban where book_number = 0
总结:
关系型数据库遇到的瓶颈:
在关系型数据库遇到瓶颈的背景下,非关系型数据库(NoSQL)数据库应运而生:
完整代码为:
import pymysql
import re
import requests
## 建立连接
conn = pymysql.connect(host="localhost",user="root",passwd="199712",db="hackdata",charset="utf8")
#获取游标
cursor = conn.cursor()
# 删除表
cursor.execute("DROP TABLE IF EXISTS douban")
sql = """CREATE TABLE douban (
id int(5) not null auto_increment primary key,
public_name CHAR(100) NOT NULL ,
book_number int(10),
picture_link char(100)
)"""
cursor.execute(sql)
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:73.0) Gecko/20100101 Firefox/73.0'}
res = requests.get("https://read.douban.com/provider/all",headers=headers).text
pat1='<div class="name">(.*?)</div>'
public_name = re.compile(pat1).findall(res)
pat2='<div class="works-num">(.*?) 部作品在售</div>'
book_number = re.compile(pat2).findall(res)
pat3='<div class="avatar"><img src="(.*?)"'
picture_link=re.compile(pat3).findall(res)
for i in range(len(public_name)):
insert_douban = "INSERT INTO douban(public_name,book_number,picture_link) VALUES (%s,%s,%s)"
data_douban = (public_name[i],book_number[i],picture_link[i])
cursor.execute(insert_douban,data_douban)
conn.commit()
cursor.close()
conn.close()
最终运行结果为:
近年来,NoSQL数据库发展势头非常迅猛。在短短四五年内,NoSQL爆炸性的产生了50-150个新的数据库。(http://nosql-database.org)。
其中MongoDB(一种文档数据库)极度火热。 MongoDB的本地安装:https://www.mongodb.com/download-center?jmp=nav#community Robo3T可视化工具安装:https://robomongo.org/download
文档类似于关系型数据库中的行,但不必为二维表结构,存储更为灵活方便。
Pymongo是在Python3中用于连接MongoDB服务器的一个库。
通过以上两个步骤,将Python爬虫采集的数据保存到MongoDB数据库中。
步骤: 1、引入pymongo库
import pymongo
2、建立连接对象
conn = MongoClient('localhost',27017)
3、自定数据库和集合名称
db = conn.movie_test
collection = db.top250
4、插入数据
collection.insert_one(movie)
下图为事先爬取的豆瓣top250的电影信息,并存储到MongoDB数据库中。
实际使用中,可以使用find命令从数据库中筛选符合条件的数据。 筛选电影《千与千寻》的影片信息:
db.top250.find({'title':'千与千寻'}).pretty()
总结:
上述的完整代码:
import requests
from time import sleep
from pymongo import MongoClient
# 连接(服务器地址、端口号)
conn = MongoClient('localhost',27017)
# 选择数据库
db= conn.movie_test
# 选择表
collection = db.top_250
# 采集数据
url = 'https://movie.douban.com/top250'
for start in range(0,250,25):
r = requests.get(url,params={'start':start,'count':25})
print('processing %s' % r.url)
res = r.json()
for movie in res['subjects']:
collection.insert_one(movie)
print(movie['title'],'saved')
sleep(0.1)
大数据时代的到来促成了分布式文件系统:
单个节点发生故障,备份机制使得HDFS可从其它节点获取数据,系统可持续运行。
允许文件通过网络在多台机器上进行分享。
多台机器同时进行读取或者写入,极大缩短了读写时间。
只支持一个写入者,文件写入后,不能进行修改。
Namenode会限制文件的存储数量。
高吞吐会导致高延迟。
搭建步骤:
登录前端查看集群状态:http://192.168.138.2.100:8088/cluster
bin/hdfs dfs -mkdir /data
将爬取的豆瓣文件放在HDFS中的根目录上:
bin/hdfs dfs -put /home/douban.txt /
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/data"))
完整代码如下:
public class test2{
@Test
public void testMKdir() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.2.100:9000");
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path("/data"));
fs.close();
}
}
FileInputStream in = new FileIputStream("D:/hackdata/douban.txt");
FSDataOutputStream out = fs.create(new Path("/data/new"),ture);
IOUtils.copyBytes(in,out,4096,true);
完整代码:
public class test1{
@Test
public void testUpload() throws IOException{
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.2.100:9000");
FileSystem fs = FileSystem.get(conf);
FileInputStream in = new FileIputStream("D:/hackdata/douban.txt");
FSDataOutputStream out = fs.create(new Path("/data/new"),ture);
IOUtils.copyBytes(in,out,4096,true);
fs.close();
}
}
open(path)打开指定路径的文件:
FSDataInputStream fis = fs.open(path);
声明字节数组bytes,以字节流方式读入数据:
byte[] bytes = new byte[1024];
字节数组转化为字符串类型,并进行打印:
System.out.println(new String(bytes,0,len));
完整代码:
public class test3{
@Test
public void testRead() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.2.100:9000");
FileSystem fs = FileSystem.get(conf);
Path = new Path("/data/new");
FSDataInputStream fis = fs.open(path);
byte[] bytes = new byte[1024];
int len = -1;
while ((len = fis.read(bytes))!=-1){
System.out.println(new
String(bytes,0,len));
}
fis.close();
}
}
fs.delete(new Path("/data"),ture);
完整代码:
public class test4{
@Test
public void testDelete() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.2.100:9000");
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path("/data"),ture);
fs.close();
}
}