前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >爬虫实战-豆瓣电影Top250

爬虫实战-豆瓣电影Top250

作者头像
一只大鸽子
发布2022-12-06 09:29:07
5890
发布2022-12-06 09:29:07
举报
文章被收录于专栏:Python基础、进阶与实战

摘要

本文通过requests和re库实现了豆瓣电影top250的爬取。

首先是对书上案例进行学习,了解如何定位网站中我们需要的信息,并使用re提供的正则表达式匹配我们的信息。然后为了加快爬虫速度,我们使用了多进程multiprocessing。最后进入实战,对真实的网站进行爬取。在实战中,我们遇到了一些新问题,需要处理网站的反爬虫机制。

书上案例

《Python3 网络爬虫开发实战》(第二版)作者崔庆才搭建的平台Scrape Center。对爬虫感兴趣的可以看一看。

我们进入第一个案例Scrape | Movie。爬取top250电影。

网站分析

在使用代码爬取前,我们需要分析网站是怎么放置电影信息的:

这里我们先对作者搭建的一个网站进行爬取(学会后我们再对真实的豆瓣爬取):

进入网址https://ssr1.scrape.center,然后F12打开开发者模式:

(注:下图用红色箭头标注的是选择工具,我们点击选择工具然后选定左边的页面元素,就可以在右边看到对应的代码)

点击选择工具,然后选择一个电影,发现每一个电影都是一个div节点,class属性都有el-card

接着看一下详情页。我们点击标题就会进入详情页,我们发现标题节点的外层有一个a节点带有href属性(指向详情/detail/1),我们只要将前缀https://ssr1.scrape.center和其拼起来就得到详情网址了。

现在我们已经知道如何爬取一页的电影了。下面需要翻页,继续爬取下一页:

我们可以发现每一页的规律是https://ssr1.scrape.center/page/页码

只有最后的页码不一样。

代码实现:

在上面的分析之后,我们得到如下思路:

1.进入电影列表页面,获取每个电影的详情URL。

2.对10页电影列表进行相同操作。

代码语言:javascript
复制
import re

import requests
import logging
from urllib.parse import urljoin
# logging 用来打印信息
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error(f'get invalid status code{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"error_orrurred while scraping{url}",exc_info=True)

def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern,html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL,item)
        logging.info(f"get detail url {detail_url}")
        yield  detail_url


def main():
    for page in range(1,TOTAL_PAGE+1):
        index_html = scrape_index(page)
        detail_urls = parse_index(index_html)
        logging.info(f"detail urls {list(detail_urls)}" )

if __name__ == '__main__':
    main()

现在已经获得了所有电影的详情页的url。下面对每个电影的详情页进行特征提取。

代码语言:javascript
复制
def parse_detail(html):
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
    published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2}\s?上映)')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
    cover = re.search(cover_pattern, html).group(1).strip() \
            if re.search(cover_pattern, html) else None
    name = re.search(name_pattern,html).group(1).strip()\
            if re.search(name_pattern,html) else None
    categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
    published_at = re.search(published_at_pattern,html).group(1)\
            if re.search(published_at_pattern,html) else None
    drama = re.search(drama_pattern, html).group(1).strip() \
            if re.search(drama_pattern, html) else None
    score = re.search(score_pattern, html).group(1).strip() \
            if re.search(score_pattern, html)  else None
    score = float(score)

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

保存电影详情到json文件中:

代码语言:javascript
复制
import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)

def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path,'w',encoding='utf-8') as f:
        json.dump(data,f, ensure_ascii=False, indent=2)

为了逻辑清晰,我们定义一个函数scrape_detail用于爬取详情页。

代码语言:javascript
复制
def scrape_detail(url):
    return scrape_page(url)

并且在main函数中调用爬取详情:

代码语言:javascript
复制
def main():
    for page in range(1,TOTAL_PAGE+1):
        index_html = scrape_index(page)
        detail_urls = parse_index(index_html)

        for detail_url in detail_urls:
            logging.info(f"get detail_url {detail_url}")
            detail_html = scrape_detail(detail_url)
            data = parse_detail(detail_html)
            logging.info(f"get detail data {data}")
            save_data(data)

最后附上完整代码:

代码语言:javascript
复制
import re
import time

import requests
import requests
import logging
from urllib.parse import urljoin

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错",exc_info=True)
    except Exception as e:
        print("未知错误", e)

def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def scrape_detail(url):
    return scrape_page(url)

def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern,html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL,item)
        logging.info(f"get detail url {detail_url}")
        yield  detail_url




def parse_detail(html):
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
    published_at_pattern = re.compile('(\d{4}-\d{2}-\d{2}\s?上映)')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
    cover = re.search(cover_pattern, html).group(1).strip() \
            if re.search(cover_pattern, html) else None
    name = re.search(name_pattern,html).group(1).strip()\
            if re.search(name_pattern,html) else None
    categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
    published_at = re.search(published_at_pattern,html).group(1)\
            if re.search(published_at_pattern,html) else None
    drama = re.search(drama_pattern, html).group(1).strip() \
            if re.search(drama_pattern, html) else None
    score = re.search(score_pattern, html).group(1).strip() \
            if re.search(score_pattern, html)  else None
    score = float(score)

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)

def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path,'w',encoding='utf-8') as f:
        json.dump(data,f, ensure_ascii=False, indent=2)

import multiprocessing

def main(page):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)

    for detail_url in detail_urls:
        logging.info(f"get detail_url {detail_url}")
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        logging.info(f"get detail data {data}")
        save_data(data)

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(1,TOTAL_PAGE+1)
    pool.map(main, pages)
    pool.close()
    pool.join()

最后我们为了加速,将程序改成多进程multiprocessing,并行爬取10个网页。

然而运行时发现,有些电影没有爬取下来,并且出现报错 服务器拒绝连接,猜测时作者的服务器负载有限,拒绝了一些请求。

豆瓣TOP250

我们用同样的思路去爬取豆瓣TOP250

起始页:

https://movie.douban.com/top250

翻页:

https://movie.douban.com/top250?start=25&filter=

https://movie.douban.com/top250?start=50&filter=

经过测试发现,只需要修改start参数就可以实现翻页。

此外,豆瓣有反爬虫机制,需要给response加上浏览器头

代码语言:javascript
复制
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE'
}
response = requests.get(url,headers=header)

之后的流程都差不多,主要不同的地方在于写正则表达式匹配我们需要的信息。

完整代码:

代码语言:javascript
复制
import re
import time

import requests
import requests
import logging
from urllib.parse import urljoin

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://movie.douban.com/top250'
TOTAL_PAGE = 10


def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \
        Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
        response = requests.get(url, headers=header)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错", exc_info=True)
    except Exception as e:
        print("未知错误", e)


def scrape_index(page):
    index_url = f'{BASE_URL}?start={25 * page}'
    return scrape_page(index_url)


def scrape_detail(url):
    return scrape_page(url)


def parse_index(html):
    pattern = re.compile('<div class="item">.*?<a href="(.*?)"', re.S)
    items = re.findall(pattern, html)
    if not items:
        print("没找到匹配的连接")
        return []
    for item in items:
        detail_url = item
        logging.info(f"get detail url {detail_url}")
        yield detail_url


def parse_detail(html):

    name_pattern = re.compile('<span property="v:itemreviewed">(.*?)</span>', re.S)
    categories_pattern = re.compile('<span property="v:genre">(.*?)</span>', re.S)

    score_pattern = re.compile('<strong class="ll rating_num" property="v:average">(\d.\d)</strong>', re.S)


    name = re.search(name_pattern, html).group(1).strip() \
        if re.search(name_pattern, html) else None
    categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else []

    score = re.search(score_pattern, html).group(1).strip() \
        if re.search(score_pattern, html) else None
    score = float(score) if score else 0

    return {   
        'name': name,
        'categories': categories,
        'score': score
    }


import json
from os import makedirs
from os.path import exists

RESULT_DIR = 'results250'
exists(RESULT_DIR) or makedirs(RESULT_DIR)


def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


import multiprocessing


def main(page):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)

    for detail_url in detail_urls:
        logging.info(f"get detail_url {detail_url}")
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        logging.info(f"get detail data {data}")
        save_data(data)


if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(0, TOTAL_PAGE)

    pool.map(main, pages)
    pool.close() # 关闭进程池,不在接受新的进程
    pool.join()

小结

总结本文用到的技术:

使用requests.get 获取网址内容:

代码语言:javascript
复制
def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \
        Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
        response = requests.get(url, headers=header)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错", exc_info=True)
    except Exception as e:
        print("未知错误", e)

使用re,re.complie('正则表达式')编写匹配模式,re.search(pattern, html)re.findall(pattern, html)匹配符合pattern的内容。

Python 正则表达式 | 菜鸟教程 (runoob.com)

使用json保存字典数据:

代码语言:javascript
复制
def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

使用multiprocessing库提供的多进程并行,加速爬虫:

代码语言:javascript
复制
    pool = multiprocessing.Pool() # 创建Pool
    pages = range(0, TOTAL_PAGE) 

    pool.map(main, pages) # 创建函数和参数映射
    pool.close() # 关闭进程池,不在接受新的进程
    pool.join() #开始并行

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一只大鸽子 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 摘要
  • 书上案例
    • 网站分析
      • 代码实现:
      • 豆瓣TOP250
      • 小结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档