偶然间看到豆瓣电影的TOP250榜单,于是突发奇想写了这个爬虫脚本。将通过爬取豆瓣电影TOP250的榜单列表获取电影详情页的URL,然后再爬取电影详情页URL中的内容,最终获得电影的名称,导演,演员,类别,制片国家/地区,语言,上映日期,片长和剧情简介等信息,经过一系列的处理后输出。
最终结果将输出为JSON形式,为当前运行目录下的result.json,形式如下
import requests
import threading
import re
import queue
import json
# Global HTTP Header Settings
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/78.0.3904.108 Safari/537.36",
}
detailUrls = []
exitFlag = False
jsonResult = []
# Retrieve the detail url
def getDetailUrl(source):
pattern = r'<div class="hd">[\s]+<a href="(.*?)"'
matcher = re.compile(pattern, re.S | re.M)
result = matcher.findall(source)
return result
# Get the movie details
def getMovieDetail(source):
# Regex Patterns
titlePattern = r'v:itemreviewed">(.*?)</span>'
directorPattern = r'rel="v:directedBy">(.*?)</a>'
starPattern = r'rel="v:starring">(.*?)</a>'
categoryPattern = r'property="v:genre">(.*?)</span>'
countryPattern = r'<span class="pl">制片国家/地区:</span>[\s]+(.*?)<'
languagePattern = r'<span class="pl">语言:</span>[\s]+(.*?)<'
releaseTimePattern = r'v:initialReleaseDate".*?>(.*?)</span>'
runtimePattern = r'v:runtime"[\s]+content="(.*?)"'
descriptionPattern = r'property="v:summary".*?>[\s]+(.*?)</span>'
# Match Results
titleMatch = re.search(titlePattern, source, re.S | re.M)
directorMatch = re.findall(directorPattern, source, re.S | re.M)
starMatch = re.findall(starPattern, source, re.S | re.M)
categoryMatch = re.findall(categoryPattern, source, re.S | re.M)
countryMatch = re.search(countryPattern, source, re.S | re.M)
languageMatch = re.search(languagePattern, source, re.S | re.M)
releaseTimeMatch = re.findall(releaseTimePattern, source, re.S | re.M)
runtimeMatch = re.search(runtimePattern, source, re.S | re.M)
descriptionMatch = re.search(descriptionPattern, source, re.S | re.M)
# Build the Result Dict
try:
result = {
"num": "",
"title": titleMatch.group(1),
"director": "/".join(directorMatch),
"stars": "/".join(starMatch),
"category": "/".join(categoryMatch),
"country": countryMatch.group(1),
"language": languageMatch.group(1),
"release_time": "/".join(releaseTimeMatch),
"runtime": runtimeMatch.group(1),
"description": re.sub(r'[\s]{3,}', "", descriptionMatch.group(1)) # Delete the blanks
}
return result
except Exception as e:
return None
# Fetch the movie details from the detail url
def fetchDetails(detailUrl):
r = requests.get(url=detailUrl, headers=headers, timeout=15)
result = getMovieDetail(r.text)
return result
# Fetch the movie list information
def fetchPage(startRecord):
targetUrl = "https://movie.douban.com/top250?start=" + str(startRecord)
r = requests.get(url=targetUrl, headers=headers, timeout=15)
urlList = getDetailUrl(r.text)
count = startRecord
for detailUrl in urlList:
with lock:
count += 1
detailUrls.append({"num": count, "url": detailUrl})
pass
def detailJob():
while not q.empty():
target = q.get()
targetUrl = target['url']
targetNum = target['num']
result = fetchDetails(targetUrl)
if result is not None:
result['num'] = targetNum
jsonResult.append(result)
pass
if __name__ == '__main__':
lock = threading.Lock()
q = queue.Queue()
threadList = []
# Create and Start the fetch page job
print("Create and Start the fetch page job")
page = 0
for i in range(10):
t = threading.Thread(target=fetchPage, args=(page,))
threadList.append(t)
t.start()
page += 25
pass
for t in threadList:
t.join()
pass
threadList.clear()
print("Fetch detail urls finished")
for url in detailUrls:
q.put(url)
# Create and Start the fetch details job
print("Start to fetch details")
for i in range(10):
t = threading.Thread(target=detailJob)
threadList.append(t)
t.start()
for t in threadList:
t.join()
pass
print("Fetch Details Finished")
print("Start to Write Data")
jsonResult.sort(key=lambda x: x['num'])
with open("result.json", "a+", encoding="utf-8") as fo:
fo.write(json.dumps(jsonResult, indent=2, ensure_ascii=False))
print("Over")