WIN环境下
selenium环境安装
pip3 install selenium
还需要下载一个谷歌浏览器对应的chromedriver,下载地址:https://npm.taobao.org/mirrors/chromedriver/
放在python安装的对应目录即可,如下:
至于linux,mac环境可以参考其他的文章
12306页面分析:
在查询票这里,可以看到url的变化,是以拼接的方式构成的完整的url
kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs=上海,SHH&ts=成都,CDW&date=2019-05-15&flag=N,N,Y
上海,SHH SHH为城市编号
成都,CDW CDW为城市编号
date=2019-05-15 为出发的日期
如下图所标:
只要找到各个城市对应编号,构造请求的url,就可以实现查询。
城市编码需要到首页获取:https://www.12306.cn/index/
我这里只是获取了热门城市的编号,之前用了requests去请求没有数据,这部分是基于JS动态加载的,那么还是上selenium把,无奈....
完整代码如下:
from selenium import webdriver
import time
import json
class QueryTicket:
def main(self):
url = 'https://www.12306.cn/index/index.html'
options = webdriver.ChromeOptions()
options.add_argument('--disable-infobars')
options.add_argument('--start-maximized')
options.add_argument('--headless')
browser = webdriver.Chrome(chrome_options=options)
city_number_dict = {}
try:
browser.get(url)
browser.implicitly_wait(20)
time.sleep(3)
# 找到热门城市的标签
elements = browser.find_elements_by_xpath("//ul[@class='popcitylist']/li")
for i in elements:
city_name = i.get_attribute('title')
city_number = i.get_attribute('data')
city_number_dict.setdefault(city_name, city_number)
print(city_number_dict)
with open("city_number.txt", encoding='utf-8', mode='w') as f:
f.write(json.dumps(city_number_dict, ensure_ascii=False))
except Exception as e:
print(e)
finally:
browser.quit()
if __name__ == '__main__':
st = QueryTicket()
st.main()
运行结果:
{'北京': 'BJP', '上海': 'SHH', '天津': 'TJP', '重庆': 'CQW', '长沙': 'CSQ', '长春': 'CCT', '成都': 'CDW', '福州': 'FZS', '广州': 'GZQ', '贵阳': 'GIW', '呼和浩特': 'HHC', '哈尔滨': 'HBB', '合肥': 'HFH', '杭州': 'HZH', '海口': 'VUQ', '济南': 'JNK', '昆明': 'KMM', '拉萨': 'LSO', '兰州': 'LZJ', '南宁': 'NNZ', '南京': 'NJH', '南昌': 'NCG', '沈阳': 'SYT', '石家庄': 'SJP', '太原': 'TYV', '乌鲁木齐南': 'WMR', '武汉': 'WHN', '西宁': 'XNO', '西安': 'XAY', '银川': 'YIJ', '郑州': 'ZZF', '深圳': 'SZQ', '厦门': 'XMS'}
拿到城市,以及城市对应的编号,就可以构造请求,获取车票信息了.
只需要把城市,城市编号,出发日 拼接成一个完整的url即可,如下:
https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs=上海,SHH&ts=成都,CDW&date=2019-05-15&flag=N,N,Y
这里我又尝试用requests请求,还是没有数据,老老实实用selenium把
引用了一个资源文件,里面放的是user-agent
resource.py内容如下:
#!/usr/bin/env python
# coding: utf-8
UserAgents = [
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:6.0) Gecko/20100101 Firefox/6.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50",
"Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0;",
"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1",
"Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; en) Presto/2.8.131 Version/11.11",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)",
"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Avant Browser)",
]
完整代码如下:
#!/usr/bin/env python
# coding: utf-8
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
import json
import random
import resource
import logging
class QueryTicket:
@classmethod
def getRandomHeaders(self):
# 随机选取User-Agent头
return random.choice(resource.UserAgents)
def main(self, start_city, end_city, start_time):
# 构建返回数据JSON
result = {
"describe": None,
"data": None,
}
logging.captureWarnings(True)
options = webdriver.ChromeOptions()
options.add_argument('--disable-infobars')
options.add_argument('--start-maximized')
options.add_argument('user-agent={}'.format(self.getRandomHeaders))
options.add_argument('--headless')
browser = webdriver.Chrome(chrome_options=options)
try:
all_train_number_list = []
url = self.splicing_url(start_city, end_city, start_time)
browser.get(url)
browser.implicitly_wait(10)
button = (By.XPATH, "//tbody[@id='queryLeftTable']/tr")
WebDriverWait(browser, 20, 0.5).until(EC.presence_of_element_located(button))
# 获取总信息
train_number_describe = browser.find_element_by_xpath("//div[@id='sear-result']/p").text
result["describe"] = train_number_describe
element = browser.find_element_by_xpath("//tbody[@id='queryLeftTable']")
# 找到所有车次
lists = element.find_elements_by_xpath("./tr[@class='bgc'] | tr[@class='']")
for i in lists:
all_train_number_dict = {}
# 车次
train_number = i.find_element_by_xpath(".//a[@class='number']").text
# 出发站
departure_station = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[0].text
# 到达站
destination = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[1].text
# 出发时间
departure_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[0].text
# 到达时间
arrival_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[1].text
# 历时 ,总时间
duration_total_time = i.find_element_by_xpath(".//div[@class='ls']/strong").text
# 历时,是否当日到达
duration_describe = i.find_element_by_xpath(".//div[@class='ls']/span").text
# 商务座,特等座
business_seat = i.find_elements_by_xpath(".//td")[1].text
# 一等座
first_class_seat = i.find_elements_by_xpath(".//td")[2].text
# 二等座
two_class_seat = i.find_elements_by_xpath(".//td")[3].text
# 高级软卧
high_grade_soft_berth = i.find_elements_by_xpath(".//td")[4].text
# 软卧一等卧
first_class_sleeping = i.find_elements_by_xpath(".//td")[5].text
# 动卧
moving_position = i.find_elements_by_xpath(".//td")[6].text
# 硬卧二等卧
two_class_sleeping = i.find_elements_by_xpath(".//td")[7].text
# 软座
soft_seats = i.find_elements_by_xpath(".//td")[8].text
# 硬座
hard_seat = i.find_elements_by_xpath(".//td")[9].text
# 无座
no_seat = i.find_elements_by_xpath(".//td")[10].text
# 其它
other = i.find_elements_by_xpath(".//td")[11].text
all_train_number_dict.setdefault("车次", train_number)
all_train_number_dict.setdefault("出发站", departure_station)
all_train_number_dict.setdefault("到达站", destination)
all_train_number_dict.setdefault("出发时间", departure_time)
all_train_number_dict.setdefault("到达时间", arrival_time)
all_train_number_dict.setdefault("历时总时间", duration_total_time)
all_train_number_dict.setdefault("历时是否当日到达", duration_describe)
all_train_number_dict.setdefault("商务座特等座", business_seat)
all_train_number_dict.setdefault("一等座", first_class_seat)
all_train_number_dict.setdefault("二等座", two_class_seat)
all_train_number_dict.setdefault("高级软卧", high_grade_soft_berth)
all_train_number_dict.setdefault("软卧一等卧", first_class_sleeping)
all_train_number_dict.setdefault("动卧", moving_position)
all_train_number_dict.setdefault("硬卧二等卧", two_class_sleeping)
all_train_number_dict.setdefault("软座", soft_seats)
all_train_number_dict.setdefault("硬座", hard_seat)
all_train_number_dict.setdefault("无座", no_seat)
all_train_number_dict.setdefault("其它", other)
all_train_number_list.append(all_train_number_dict)
result['data'] = all_train_number_list
return result
except Exception as e:
print(e)
return result
finally:
browser.quit()
def splicing_url(self, start_city, end_city, start_time):
with open("city_number.txt", encoding='utf-8', mode='r') as f:
city_number_dict = json.loads(f.read())
if start_city and end_city in city_number_dict:
url = 'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs={},{}&ts={},{}&date={}&flag=N,N,Y'.\
format(start_city, city_number_dict.get(start_city),
end_city, city_number_dict.get(end_city),
start_time)
return url
else:
return False
if __name__ == '__main__':
st = QueryTicket()
ret = st.main('上海', '武汉', '2019-05-28')
print(ret)
运行结果如下:
最后整合代码,使用django rest framework,实现一个基于post请求的查询接口
版本信息:
Django==2.2.1
djangorestframework==3.9.4
django settings.py配置,主要是三个地方,第三个可选
1 在 INSTALLED_APPS下配置rest_framework,如下:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'app01.apps.App01Config',
'rest_framework',
]
2 在最后添加上rest_framework的配置参数
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.URLPathVersioning",
'DEFAULT_VERSION': 'v1',
'ALLOWED_VERSIONS': ['v1', 'v2'],
'VERSION_PARAM': 'version',
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 1, # 默认分页大小
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.throttle.MyThrottle", ],
"DEFAULT_THROTTLE_RATES": {
"rate": "1/s",
},
'DEFAULT_RENDERER_CLASSES': ('rest_framework.renderers.JSONRenderer', ),
}
3 允许所有ip可访问,用于内网测试
ALLOWED_HOSTS = ['*']
django路由配置urls.py
from django.contrib import admin
from django.urls import path
from django.conf.urls import url
from app01.views import RestApiView
urlpatterns = [
path('admin/', admin.site.urls),
url(r'^api?', RestApiView.as_view(), name='api'),
]
django视图配置views.py
from django.shortcuts import render
from rest_framework.versioning import QueryParameterVersioning
from rest_framework.views import APIView
from rest_framework.response import Response
from django.http import JsonResponse
from app01.selenium import query_tick_12306
import json, time, os
# Create your views here.
class RestApiView(APIView):
versioning_class = QueryParameterVersioning
def dispatch(self, request, *args, **kwargs):
"""
请求到来之后,都要执行dispatch方法,dispatch方法根据请求方式不同触发 get/post/put等方法
"""
return super().dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
return JsonResponse({"status": 200})
def post(self, request, *args, **kwargs):
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(BASE_DIR, 'selenium', 'city_number.txt')
start_city = request.data.get('start_city').strip()
end_city = request.data.get('end_city').strip()
start_time = request.data.get('start_time').strip()
now_time = self.now_time()
# 判断时间是否过期
if now_time > start_time:
return JsonResponse({"status": "error", "message": "过期时间"})
with open(file_path, encoding='utf-8', mode='r') as f:
city_number_dict = json.loads(f.read())
# print(city_number_dict)
# 判断城市是否在对应字典里面
if start_city in city_number_dict and end_city in city_number_dict:
url = 'https://kyfw.12306.cn/otn/leftTicket/init?linktypeid=dc&fs={},{}&ts={},{}&date={}&flag=N,N,Y'.\
format(start_city, city_number_dict.get(start_city),
end_city, city_number_dict.get(end_city),
start_time)
st = query_tick_12306.QueryTicket()
ret = st.main(url)
return JsonResponse(ret)
else:
return JsonResponse({"status": "error", "message": "城市错误"})
def now_time(self):
return time.strftime('%Y-%m-%d', time.localtime(time.time()))
创建两个自定义文件夹selenium,utils,完整目录结构如下:
selenium对应爬虫的两个文件,需手动运行获取城市编号的python脚本,另一个是获取车票信息的
get_12306_city.py 获取城市,以及城市编号,需提前手动运行下,会自动生成一个city_number.txt文件
from selenium import webdriver
import time
import json
class QueryTicket:
def main(self):
url = 'https://www.12306.cn/index/index.html'
options = webdriver.ChromeOptions()
options.add_argument('--disable-infobars')
options.add_argument('--start-maximized')
options.add_argument('--headless')
browser = webdriver.Chrome(chrome_options=options)
city_number_dict = {}
try:
browser.get(url)
browser.implicitly_wait(20)
time.sleep(3)
# 找到热门城市的标签
elements = browser.find_elements_by_xpath("//ul[@class='popcitylist']/li")
for i in elements:
city_name = i.get_attribute('title')
city_number = i.get_attribute('data')
city_number_dict.setdefault(city_name, city_number)
print(city_number_dict)
with open("city_number.txt", encoding='utf-8', mode='w') as f:
f.write(json.dumps(city_number_dict, ensure_ascii=False))
except Exception as e:
print(e)
finally:
browser.quit()
if __name__ == '__main__':
st = QueryTicket()
st.main()
query_tick_12306.py 获取车票信息
#!/usr/bin/env python
# coding: utf-8
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
import random
from app01.selenium import resource
import logging
class QueryTicket:
@classmethod
def getRandomHeaders(self):
# 随机选取User-Agent头
return random.choice(resource.UserAgents)
def main(self, url):
# 构建返回数据JSON
result = {
"describe": None,
"data": None,
}
logging.captureWarnings(True)
options = webdriver.ChromeOptions()
options.add_argument('--disable-infobars')
options.add_argument('--start-maximized')
options.add_argument('user-agent={}'.format(self.getRandomHeaders))
options.add_argument('--headless')
browser = webdriver.Chrome(chrome_options=options)
try:
all_train_number_list = []
# url = self.splicing_url(start_city, end_city, start_time)
browser.get(url)
browser.implicitly_wait(10)
button = (By.XPATH, "//tbody[@id='queryLeftTable']/tr")
WebDriverWait(browser, 20, 0.5).until(EC.presence_of_element_located(button))
# 获取总信息
train_number_describe = browser.find_element_by_xpath("//div[@id='sear-result']/p").text
result["describe"] = train_number_describe
element = browser.find_element_by_xpath("//tbody[@id='queryLeftTable']")
# 找到所有车次
lists = element.find_elements_by_xpath("./tr[@class='bgc'] | tr[@class='']")
for i in lists:
all_train_number_dict = {}
# 车次
train_number = i.find_element_by_xpath(".//a[@class='number']").text
# 出发站
departure_station = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[0].text
# 到达站
destination = i.find_elements_by_xpath(".//div[@class='cdz']/strong")[1].text
# 出发时间
departure_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[0].text
# 到达时间
arrival_time = i.find_elements_by_xpath(".//div[@class='cds']/strong")[1].text
# 历时 ,总时间
duration_total_time = i.find_element_by_xpath(".//div[@class='ls']/strong").text
# 历时,是否当日到达
duration_describe = i.find_element_by_xpath(".//div[@class='ls']/span").text
# 商务座,特等座
business_seat = i.find_elements_by_xpath(".//td")[1].text
# 一等座
first_class_seat = i.find_elements_by_xpath(".//td")[2].text
# 二等座
two_class_seat = i.find_elements_by_xpath(".//td")[3].text
# 高级软卧
high_grade_soft_berth = i.find_elements_by_xpath(".//td")[4].text
# 软卧一等卧
first_class_sleeping = i.find_elements_by_xpath(".//td")[5].text
# 动卧
moving_position = i.find_elements_by_xpath(".//td")[6].text
# 硬卧二等卧
two_class_sleeping = i.find_elements_by_xpath(".//td")[7].text
# 软座
soft_seats = i.find_elements_by_xpath(".//td")[8].text
# 硬座
hard_seat = i.find_elements_by_xpath(".//td")[9].text
# 无座
no_seat = i.find_elements_by_xpath(".//td")[10].text
# 其它
other = i.find_elements_by_xpath(".//td")[11].text
all_train_number_dict.setdefault("车次", train_number)
all_train_number_dict.setdefault("出发站", departure_station)
all_train_number_dict.setdefault("到达站", destination)
all_train_number_dict.setdefault("出发时间", departure_time)
all_train_number_dict.setdefault("到达时间", arrival_time)
all_train_number_dict.setdefault("历时总时间", duration_total_time)
all_train_number_dict.setdefault("历时是否当日到达", duration_describe)
all_train_number_dict.setdefault("商务座特等座", business_seat)
all_train_number_dict.setdefault("一等座", first_class_seat)
all_train_number_dict.setdefault("二等座", two_class_seat)
all_train_number_dict.setdefault("高级软卧", high_grade_soft_berth)
all_train_number_dict.setdefault("软卧一等卧", first_class_sleeping)
all_train_number_dict.setdefault("动卧", moving_position)
all_train_number_dict.setdefault("硬卧二等卧", two_class_sleeping)
all_train_number_dict.setdefault("软座", soft_seats)
all_train_number_dict.setdefault("硬座", hard_seat)
all_train_number_dict.setdefault("无座", no_seat)
all_train_number_dict.setdefault("其它", other)
all_train_number_list.append(all_train_number_dict)
result['data'] = all_train_number_list
return result
except Exception as e:
print(e)
return result
finally:
browser.quit()
if __name__ == '__main__':
pass
utils里面有一个文件,接口访问限制
throttle.py
#!/usr/bin/env python
# coding: utf-8
"""
自定义的访问限制类
"""
from rest_framework.throttling import SimpleRateThrottle
class MyThrottle(SimpleRateThrottle):
scope = "rate" # rate是名字,可以随便定义!
def get_cache_key(self, request, view):
return self.get_ident(request)
所有配置好后,运行django
用postman测试,没有自行下载..
先测试过期的时间,
再测试下错误的城市,因为只获取了热门城市
最后在测试下,正常的请求
后期还会做web可视化,以及登陆抢票接口,未完待续.......
完整代码github地址:https://github.com/py3study/12306_ticket_inquiry