前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Scrapy_Study02

Scrapy_Study02

作者头像
人类群星闪耀时
发布2023-11-30 19:02:39
1480
发布2023-11-30 19:02:39
举报
案例 jd图书爬虫

jd图书网站爬取比较容易,主要是数据的提取

spider 代码:

代码语言:javascript
复制
import scrapy
from jdbook.pipelines import JdbookPipeline
import re
from copy import deepcopy


class JdbookspiderSpider(scrapy.Spider):
    name = 'jdbookspider'
    allowed_domains = ['jd.com']
    start_urls = ['https://book.jd.com/booksort.html']
	# 处理分类页面的数据
    def parse(self, response):
        # 这里借助了selenium 先访问jd图书网,因为直接get请求jdbook 获取到只是一堆js代码,没有有用的html元素,通过selenium正常访问网页,将page_source(就是当前网页的页面内容,selenium提供的属性)返回给spider进行数据处理
        # 处理大分类的列表页
        response_data, driver = JdbookPipeline.gain_response_data(url='https://book.jd.com/booksort.html')
        driver.close()
        item = {}
        # 由于selenium返回的page_source是字符串,所以不能直接使用xpath,使用了正则(也可以借助bs4 再使用正则)
        middle_group_link = re.findall('<em>.*?<a href="(.*?)">.*?</a>.*?</em>', response_data, re.S)
        big_group_name = re.findall('<dt>.*?<a href=".*?">(.*?)</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
        big_group_link = re.findall('<dt>.*?<a href=".*?channel.jd.com/(.*?)\.html">.*?</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
        middle_group_name = re.findall('<em>.*?<a href=".*?">(.*?)</a>.*?</em>', response_data, re.S)
        for i in range(len(middle_group_link)):
            var = str(middle_group_link[i])
            var1 = var[:var.find("com") + 4]
            var2 = var[var.find("com") + 4:]
            var3 = var2.replace("-", ",").replace(".html", "")
            var_end = "https:" + var1 + "list.html?cat=" + var3
            for j in range(len(big_group_name)):
                temp_ = var_end.find(str(big_group_link[j]).replace("-", ","))
                if temp_ != -1:
                    item["big_group_name"] = big_group_name[j]
                    item["big_group_link"] = big_group_link[j]
            item["middle_group_link"] = var_end
            item["middle_group_name"] = middle_group_name[i]
            # 请求大分组下的小分组的详情页
            if var_end is not None:
                yield scrapy.Request(
                    var_end,
                    callback=self.parse_detail,
                    meta={"item": deepcopy(item)}
                )
	# 处理图书列表页的数据
    def parse_detail(self, response):
        print(response.url)
        item = response.meta["item"]
        detail_name_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?".*?<em>(.*?)</em>', response.body.decode(), re.S)
        detail_content_list = re.findall(
            '<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title="(.*?)"', response.body.decode(),
            re.S)
        detail_link_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?" href="(.*?)"', response.body.decode(), re.S)
        detail_price_list = re.findall('<div class="p-price">.*?<strong class="J_.*?".*?data-done="1".*?>.*?<em>¥</em>.*?<i>(.*?)</i>', response.body.decode(), re.S)
        page_number_end = re.findall('<span class="fp-text">.*?<b>.*?</b>.*?<em>.*?</em>.*?<i>(.*?)</i>.*?</span>', response.body.decode(), re.S)[0]
        print(len(detail_price_list))
        print(len(detail_name_list))
        for i in range(len(detail_name_list)):
            detail_link = detail_link_list[i]
            item["detail_name"] = detail_name_list[i]
            item["detail_content"] = detail_content_list[i]
            item["detail_link"] = "https:" + detail_link
            item["detail_price"] = detail_price_list[i]
            yield item
        # 翻页
        for i in range(int(page_number_end)):

            next_url = item["middle_group_link"] + "&page=" + str(2*(i+1) + 1) + "&s=" + str(60*(i+1)) + "&click=0"
            yield scrapy.Request(
                next_url,
                callback=self.parse_detail,
                meta={"item": deepcopy(item)}
            )

pipeline 代码:

代码语言:javascript
复制
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
import csv

from itemadapter import ItemAdapter
from selenium import webdriver
import time


class JdbookPipeline:
 	# 将数据写入csv文件
    def process_item(self, item, spider):
        with open('./jdbook.csv', 'a+', encoding='utf-8') as file:
            fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
                          'detail_content', 'detail_link', 'detail_price']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writerow(item)
        return item

    def open_spider(self, spider):
        with open('./jdbook.csv', 'w+', encoding='utf-8') as file:
            fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
                          'detail_content', 'detail_link', 'detail_price']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
	# 提供的正常访问jdbook 方法,借助selenium
    @staticmethod
    def gain_response_data(url):
        drivers = webdriver.Chrome("E:\python_study\spider\data\chromedriver_win32\chromedriver.exe")
        drivers.implicitly_wait(2)
        drivers.get(url)
        drivers.implicitly_wait(2)
        time.sleep(2)
        # print(tb_cookie)
        return drivers.page_source, drivers
案例 当当图书爬虫

当当网的爬取也是比较容易, 但是这里需要结合scrapy-redis来实现分布式爬取数据

代码语言:javascript
复制
import urllib
from copy import deepcopy
import scrapy
from scrapy_redis.spiders import RedisSpider
import re


# 不再是继承Spider类,而是继承自scrapy_redis的RedisSpider类
class DangdangspiderSpider(RedisSpider):
    name = 'dangdangspider'
    allowed_domains = ['dangdang.com']
    # http://book.dangdang.com/
    # 同时,start_urls 也不在使用, 而是定义一个redis_key, spider要爬取的request对象就以该值为key, url为值存储在redis中,spider爬取时就从redis 中获取
    redis_key = "dangdang"
	# 处理图书分类数据
    def parse(self, response):
        div_list = response.xpath("//div[@class='con flq_body']/div")
        for div in div_list:
            item = {}
            item["b_cate"] = div.xpath("./dl/dt//text()").extract()
            item["b_cate"] = [i.strip() for i in item["b_cate"] if len(i.strip()) > 0]
            # 中间分类分组
            if len(item["b_cate"]) > 0:
                div_data = str(div.extract())
                dl_list = re.findall('''<dl class="inner_dl" ddt-area="\d+" dd_name=".*?">.*?<dt>(.*?)</dt>''',
                                     div_data, re.S)
                for dl in dl_list:
                    if len(str(dl)) > 100:
                        dl = re.findall('''.*?title="(.*?)".*?''', dl, re.S)
                    item["m_cate"] = str(dl).replace(" ", "").replace("\r\n", "")
                    # 小分类分组
                    a_link_list = re.findall(
                        '''<a class=".*?" href="(.*?)" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">.*?</a>''',
                        div_data, re.S)
                    a_cate_list = re.findall(
                        '''<a class=".*?" href=".*?" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">(.*?)</a>''',
                        div_data, re.S)
                    print(a_cate_list)
                    print(a_link_list)
                    for a in range(len(a_link_list)):
                        item["s_href"] = a_link_list[a]
                        item["s_cate"] = a_cate_list[a]

                        if item["s_href"] is not None:
                            yield scrapy.Request(
                                item["s_href"],
                                callback=self.parse_book_list,
                                meta={"item": deepcopy(item)}
                            )
	# 处理图书列表页数据
    def parse_book_list(self, response):
        item = response.meta["item"]
        li_list = response.xpath("//ul[@class='bigimg']/li")
        # todo 改进,对不同的图书列表页做不同的处理
        # if li_list is None:
        #     print(True)

        for li in li_list:
            item["book_img"] = li.xpath('./a[1]/img/@src').extract_first()
            if item["book_img"] is None:
                item["book_img"] = li.xpath("//ul[@class='list_aa ']/li").extract_first()
            item["book_name"] = li.xpath("./p[@class='name']/a/@title").extract_first()
            item["book_desc"] = li.xpath("./p[@class='detail']/text()").extract_first()
            item["book_price"] = li.xpath(".//span[@class='search_now_price']/text()").extract_first()
            item["book_author"] = li.xpath("./p[@class='search_book_author']/span[1]/a/text()").extract_first()
            item["book_publish_date"] = li.xpath("./p[@class='search_book_author']/span[2]/text()").extract_first()
            item["book_press"] = li.xpath("./p[@class='search_book_author']/span[3]/a/text()").extract_first()
            next_url = response.xpath("//li[@class='next']/a/@href").extract_first()
            yield item
            if next_url is not None:
                next_url = urllib.parse.urljoin(response.url, next_url)
                yield scrapy.Request(
                    next_url,
                    callback=self.parse_book_list,
                    meta={"item": item}
                )

pipeline 代码:

代码语言:javascript
复制
import csv

from itemadapter import ItemAdapter


class DangdangbookPipeline:
	# 将数据写入到csv文件中
    def process_item(self, item, spider):
        with open('./dangdangbook.csv', 'a+', encoding='utf-8', newline='') as file:
            fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price', 'book_author', 'book_publish_date', 'book_press']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writerow(item)
        return item

    def open_spider(self, spider):
        with open('./dangdangbook.csv', 'w+', encoding='utf-8', newline='') as file:
            fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price',
                          'book_author', 'book_publish_date', 'book_press']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()

settings 代码:

代码语言:javascript
复制
BOT_NAME = 'dangdangbook'

SPIDER_MODULES = ['dangdangbook.spiders']
NEWSPIDER_MODULE = 'dangdangbook.spiders'
# 需要scrapy-redis 的去重功能,这里引用
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 以及调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True
# LOG_LEVEL = 'WARNING'
# 设置redis 的服务地址
REDIS_URL = 'redis://127.0.0.1:6379'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

ITEM_PIPELINES = {
   'dangdangbook.pipelines.DangdangbookPipeline': 300,
}

crontab 定时执行
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

以上都在Linux平台的直接操作crontab。

在python环境下我们可以借助pycrontab 来操作crontab 来设置定时任务。

补充

自定义的excel 到导出文件格式代码:

代码语言:javascript
复制
from scrapy.exporters import BaseItemExporter
import xlwt
class ExcelItemExporter(BaseItemExporter):
	def __init__(self, file, **kwargs):
		self._configure(kwargs)
		self.file = file
		self.wbook = xlwt.Workbook()
		self.wsheet = self.wbook.add_sheet('scrapy')
		self.row = 0
	def finish_exporting(self):
		self.wbook.save(self.file)
	def export_item(self, item):
		fields = self._get_serialized_fields(item)
		for col, v in enumerate(x for _, x in fields):
			self.wsheet.write(self.row, col, v)
		self.row += 1
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例 jd图书爬虫
  • 案例 当当图书爬虫
  • crontab 定时执行
    • 补充
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档