前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >今年,我只赚了一点点

今年,我只赚了一点点

作者头像
Jack_Cui
发布2022-12-22 18:29:50
5510
发布2022-12-22 18:29:50
举报
文章被收录于专栏:Jack-CuiJack-Cui

大家好,我是 Jack。

之前一直有小伙伴问我,有没有免费的股票信息查询的 API 接口?

我看了一圈,很多免费的 API 接口都年久失修,失效了。

那好吧,咱自己写一个

想要玩量化交易,第一步,那得有稳定的股票数据来源。

然后再谈什么量化策略,怎么选股、何时买股。

怎么稳定的获取数据呢?

只能是抄起我的老板行,写个网络爬虫,自动抓取数据

玩股票、玩基金的,应该多多少听过一款股票交流 APP 雪球。

这里面的数据很全,就它了!

前方提醒:使用网络爬虫,请控制好访问频率。

在雪球上,想要获得各种股票信息,那需要携带身份信息,也就是要有 Cookie。

没有 Cookie,很多信息是获取不到的。

2017 年的时候,我就写过关于 Cookie 的文章。

一些基础知识忘记的小伙伴,可以重温下我这个系列的文章。

网络爬虫教程(2020年)

想要获取 Cookie,那就需要进行模拟登录。

模拟登录 - 准备篇

模拟登录,顾名思义,就是模拟人类的行为,登录这个网站。

登录之后,我们就可以用保存身份信息的 Cookie,获取我们想要的各种数据:股票信息、基金信息等。

我们先手动登录,体验一下整个登录流程。

手动登录

第一步:点击登录按钮。

第二步:输入帐号和密码,并点击登录。

第三步:解锁滑块。

第四步:登录成功。

模拟登录

接下来,就是需要写个代码,让代码替我们完成上述操作

这里我使用 Selenium,它是一款自动化测试工具。

不过说实话,Selenium 这东西挺老了。

现在有不少更好的工具,不过对于模拟登录的知识储备,我还停留在 2017 年,也只会用它了。

有更好更好的方法的话,欢迎小伙伴们提交 PR。

不过,好在 Selenium 虽然老了点,但还能勉强胜任获取 Cookie 这项工作的。

Selenium 不会的小伙伴,可以看我从前的教程:

https://jackcui.blog.csdn.net/article/details/72331737

想要使用 Selenium,首先需要下载浏览器驱动,这里以 Chrome 浏览器为例。

打开 Chrome 浏览器,查看 Chrome 版本号。

然后根据这个版本号,下载相同大版本的驱动。

http://chromedriver.storage.googleapis.com/index.html

根据自己的操作系统,选择对应的版本。

我的是 Windows 电脑,选择 Win32 的版本。

下载好后,解压备用。

最后安装 Selenium 第三方依赖库。

代码语言:javascript
复制
python -m pip install selenium==3.4 --user

注意,需要安装 3.4 的版本,Selenium 的新版本改动较多,用我的代码会存在接口不兼容的情况。

模拟登录 - 实战篇

我们先睹为快,看下让代码自动登录雪球的效果:

https://cuijiahua.com/wp-content/uploads/2022/12/1.mp4

(PS:录屏时间 12.2,由于大家都知道的原因,页面为黑白)

其实模拟登录的思路很简单,就是根据审查元素,找到各个元素的位置。

比如登录按钮,右键审查元素,然后选择 Copy Xpath。

就能拷贝路径地址。

使用这种方法,找到帐号输入框、密码输入框的位置,然后点击登录即可。

这里的难点在于验证码。

不过好在,GEETEST 验证码的破解,我还是有些经验的,17 年的时候,就写过相关内容。

很多代码,直接复用即可

整体思路就是:

  • 使用Selenium打开页面。
  • 匹配到输入框,输入账号密码,点击登录。
  • 读取验证码图片,并做缺口识别。
  • 根据缺口位置,计算滑动距离。
  • 根据滑动距离,拖拽滑块到需要匹配的位置。

直接放代码:

代码语言:javascript
复制
from selenium import webdriver
from selenium.webdriver import ActionChains
from io import BytesIO
import json
import base64
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from time import sleep
from PIL import Image
from selenium import webdriver

# 账号
USERNAME = '***'
# 密码
PASSWORD = '***'
BORDER = 6


class Login(object):
    def __init__(self):
        self.url = 'https://xueqiu.com/'

        opt = webdriver.ChromeOptions()
        opt.add_experimental_option('w3c', False)
        self.browser = webdriver.Chrome("chromedriver.exe", chrome_options=opt)
        self.browser.maximize_window()#第一处修复,设置浏览器全屏
        self.username = USERNAME
        self.password = PASSWORD
        self.wait = WebDriverWait(self.browser, 20)

    def __del__(self):
        print("close")

    def open(self):
        self.browser.get(self.url)
        ele = self.browser.find_element_by_xpath('//*[@id="app"]/nav/div[1]/div[2]/div/div')#第二处修复,改xpath
        ele.click()
        username = self.wait.until(EC.presence_of_element_located((By.XPATH, '//input[@name="username"]')))
        pwd = self.wait.until(EC.presence_of_element_located((By.XPATH, '//input[@name="password"]')))
        username.send_keys(self.username)
        time.sleep(2)
        pwd.send_keys(self.password)

    # 获取验证码按钮
    def get_yzm_button(self):
        button = self.wait.until(EC.presence_of_element_located((By.XPATH, '/html/body/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]')))#第三处修复,改xpath
        return button

    # 获取验证码图片对象
    def get_img_element(self):
        element = self.wait.until(EC.presence_of_element_located((By.XPATH, '//cavas[@name="geetest_canvas_bg geetest_absolute"]')))
        return element

    def get_position(self):
        # 获取验证码位置
        element = self.get_img_element()
        sleep(2)
        location = element.location
        size = element.size
        top, bottom, left, right = location['y'], location['y'] + size['height'], location['x'], location['x'] + size[
            'width']
        return left, top, right, bottom

    def get_geetest_image(self):
        """
        获取验证码图片
        :return: 图片对象
        """
        '''
        <canvas class="geetest_canvas_bg geetest_absolute" height="160" width="260"></canvas>
        '''
        # 带阴影的图片
        # im = self.wait.until(EC.presence_of_element_located((By.XPATH, '/html/body/div[4]/div[2]/div[6]/div/div[1]/div[1]/div/a/div[1]/div/canvas[1]')))
        im = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.geetest_canvas_bg')))
        time.sleep(2)
        im.screenshot('captcha.png')
        # 执行 JS 代码并拿到图片 base64 数据
        JS = 'return document.getElementsByClassName("geetest_canvas_fullbg")[0].toDataURL("image/png");'  # 不带阴影的完整图片
        im_info = self.browser.execute_script(JS)  # 执行js文件得到带图片信息的图片数据
        # 拿到base64编码的图片信息
        im_base64 = im_info.split(',')[1]
        # 转为bytes类型
        captcha1 = base64.b64decode(im_base64)
        # 将图片保存在本地
        with open('captcha1.png', 'wb') as f:
            f.write(captcha1)

        JS = 'return document.getElementsByClassName("geetest_canvas_bg")[0].toDataURL("image/png");'
        # 执行 JS 代码并拿到图片 base64 数据ng  # 带阴影的图片
        im_info = self.browser.execute_script(JS)  # 执行js文件得到带图片信息的图片数据
        # 拿到base64编码的图片信息
        im_base64 = im_info.split(',')[1]
        # 转为bytes类型
        captcha2 = base64.b64decode(im_base64)
        # 将图片保存在本地
        with open('captcha2.png', 'wb') as f:
            f.write(captcha2)

        captcha1 = Image.open('captcha1.png')
        captcha2 = Image.open('captcha2.png')
        return captcha1, captcha2

    # 获取网页截图
    def get_screen_shot(self):
        screen_shot = self.browser.get_screenshot_as_png()
        screen_shot = Image.open(BytesIO(screen_shot))
        return screen_shot

    def get_yzm_img(self, name='captcha.png'):
        #  获取验证码图片
        left, top, right, bottom = self.get_position()
        print('验证码位置', top, bottom, left, right)
        screen_shot = self.get_screen_shot()
        captcha = screen_shot.crop((left, top, right, bottom))
        captcha.save(name)
        return captcha

    def get_slider(self):
        # 获取滑块
        # :return: 滑块对象
        slider = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, 'geetest_slider_track')))
        return slider

    def get_gap(self, image1, image2):
        """
        获取缺口偏移量
        :param image1: 不带缺口图片
        :param image2: 带缺口图片
        :return:
        """
        left = 62
        for i in range(left, image1.size[0]):
            for j in range(image1.size[1]):
                if not self.is_pixel_equal(image1, image2, i, j):
                    left = i
                    return left
        # return left

    def is_pixel_equal(self, image1, image2, x, y):
        """
        判断两个像素是否相同
        :param image1: 图片1
        :param image2: 图片2
        :param x: 位置x
        :param y: 位置y
        :return: 像素是否相同
        """
        # 取两个图片的像素点
        pixel1 = image1.load()[x, y]
        pixel2 = image2.load()[x, y]
        threshold = 60
        if abs(pixel1[0] - pixel2[0]) < threshold and abs(pixel1[1] - pixel2[1]) < threshold and abs(
                pixel1[2] - pixel2[2]) < threshold:
            return True
        else:
            return False

    def get_track(self, distance):
        """
        根据偏移量获取移动轨迹
        :param distance: 偏移量
        :return: 移动轨迹
        """
        # 初速度
        v = 0
        # 单位时间为0.2s来统计轨迹,轨迹即0.2内的位移
        t = 0.3
        # 位移/轨迹列表,列表内的一个元素代表0.2s的位移
        tracks = []
        # 当前的位移
        current = 5
        # 到达mid值开始减速
        mid = distance * 3 / 5
        while current < distance:
            if current < mid:
                # 加速度越小,单位时间的位移越小,模拟的轨迹就越多越详细
                a = 2
            else:
                a = -3
            # 初速度
            v0 = v
            # 0.2秒时间内的位移
            s = v0 * t + 0.4 * a * (t ** 2)
            # 当前的位置
            current += s
            # 添加到轨迹列表
            tracks.append(round(s))
            # 速度已经达到v,该速度作为下次的初速度
            v = v0 + a * t
        return tracks

    def move_to_gap(self, slider, track):
        """
        拖动滑块到缺口处
        :param slider: 滑块
        :param track: 轨迹
        :return:
        """
        ActionChains(self.browser).click_and_hold(slider).perform()
        for x in track:
            ActionChains(self.browser).move_by_offset(xoffset=x, yoffset=0).perform()
        time.sleep(0.5)
        ActionChains(self.browser).release().perform()

    def shake_mouse(self):
        """
        模拟人手释放鼠标抖动
        :return: None
        """
        ActionChains(self.browser).move_by_offset(xoffset=-2, yoffset=0).perform()
        ActionChains(self.browser).move_by_offset(xoffset=2, yoffset=0).perform()

    def operate_slider(self, track):
        '''
        拖动滑块
        '''
        # 获取拖动按钮
        back_tracks = [-1,-1, -1, -1]
        slider_bt = self.browser.find_element_by_xpath('//div[@class="geetest_slider_button"]')

        # 点击拖动验证码的按钮不放
        ActionChains(self.browser).click_and_hold(slider_bt).perform()

        # 按正向轨迹移动
        for i in track:
            ActionChains(self.browser).move_by_offset(xoffset=i, yoffset=0).perform()

        time.sleep(1)
        ActionChains(self.browser).release().perform()

    def get_cookies(self):
        try:
            cookie_list = self.browser.get_cookies()
            cookie_dict = {i['name']: i['value'] for i in cookie_list}
            with open('xueqiu_cookies', 'w', encoding='utf8')as f:
                cookie_dict = json.dumps(cookie_dict)
                f.write(cookie_dict)

            return cookie_dict
        except:
            print("cookie 获取失败")
            return None

    # 读取cookie
    def return_cookie(self):
        cookies = ''
        with open('xueqiu_cookies', 'r')as f:
            cookie = f.read()[1:-1]
            cookie = cookie.split(', ')
            for i in cookie:
                cook = i.split(': ')
                cookies += cook[0][1:-1] + '=' + cook[1][1:-1] + ';'
            return cookies

    def run(self):
        # 破解入口
        self.open(), sleep(3)
        self.get_yzm_button().click(), sleep(2)# 点击验证按钮
        # 点按呼出缺口
        slider = self.get_slider()
        # slider.click()
        # 获取带缺口的验证码图片
        image1, image2 = self.get_geetest_image()
        gap = self.get_gap(image1, image2)
        print('缺口位置', gap)
        track = self.get_track(gap)
        print('滑动轨迹', track)
        self.operate_slider(track)
        # 判定是否成功
        time.sleep(8)
        try:
            elem = self.wait.until(
                EC.text_to_be_present_in_element((By.CLASS_NAME, 'nav__btn--longtext'), '发帖'))
            if elem:
                cookie = self.get_cookies()
            else:
                print("get cookies errors")
        except Exception as e:
            print(e, 'fail! ')
            time.sleep(3)
            self.run()
        finally:
            self.browser.quit()


if __name__ == '__main__':
    crack = Login()
    crack.run()

代码我也上传到 Github 上了,代码的后续更新维护会放在这里建议 Star 收藏下

https://github.com/Jack-Cherish/quantitative

数据获取

等待模拟登录完成后,会保存一个名为 xueqiu_cookies 的文件。

这里保存的是帐号的 Cookie,使用这个 Cookie 就能获取雪球的数据了。

比如,获取一下股票实时行情和现金流量表,就可以这样写:

代码语言:javascript
复制
#-*- coding:utf-8 -*-
import requests
import json

def fetch(url, xq_a_token):
    headers = {'Host': 'stock.xueqiu.com',
        'Accept': 'application/json',
        'Cookie': 'xq_a_token={};'.format(xq_a_token),
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
        'Accept-Language': 'zh-Hans-CN;q=1, ja-JP;q=0.9',
        'Accept-Encoding': 'br, gzip, deflate',
        'Connection': 'keep-alive'}
    response = requests.get(url, headers = headers)

    if response.status_code != 200:
        raise Exception(response.content)

    return json.loads(response.content)

if __name__ == '__main__':
    # 获取股票 SH600000 实时行情
    url = "http://stock.xueqiu.com/v5/stock/quote.json?extend=detail&symbol=SH600000&count=10"
    with open("xueqiu_cookies", "r") as f:
        cookies_info = json.load(f)
    res = fetch(url, cookies_info['xq_a_token'])
    print(res)
    # 获取股票 SH600000 现金流量表
    url = "http://stock.xueqiu.com/v5/stock/finance/cn/cash_flow.json?symbol=SH600000&count=10"
    with open("xueqiu_cookies", "r") as f:
        cookies_info = json.load(f)
    res = fetch(url, cookies_info['xq_a_token'])
    print(res)

运行结果:

有了 Cookie,很多接口数据都能获取,实时行情、实时分笔、业绩预告、机构评级、资金流向趋势、资金流向历史、资金成交分布、大宗交易、融资融券、业绩指标、利润表、资产负债表、现金流量表、主营业务构成、F10 十大股东、F10 主要指标等等。

这些数据,都能获取。

絮叨

篇幅有限,今天就是带大家小小实战下。

后续我会完善各个常用查询接口,方便大家获取各类数据,用于量化分析。

万事开头难,先弄好数据,再看量化策略~

如果喜欢这类的内容,记得点赞,喜欢的人多的话,我会快速加更的~

最后必须提醒一下各位:

获取数据,请温柔,请勿高并发获取,且用且珍惜。

对了,还有不少小伙伴问我,我的量化策略收益如何。

去年的五万元实验,最后浮盈不到 10%,清仓之后就换新的策略实验了。

6月份的时候,又用上了新策略,新的策略一直跑到今年 10 月份,也就这样:

实验没放多少钱,随便玩玩,你觉得,这点收益如何?

好了,今天就聊这么多吧,我是 Jack,我们下期见~

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-12-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 模拟登录 - 准备篇
    • 手动登录
      • 模拟登录
      • 模拟登录 - 实战篇
      • 数据获取
      • 絮叨
      相关产品与服务
      验证码
      腾讯云新一代行为验证码(Captcha),基于十道安全栅栏, 为网页、App、小程序开发者打造立体、全面的人机验证。最大程度保护注册登录、活动秒杀、点赞发帖、数据保护等各大场景下业务安全的同时,提供更精细化的用户体验。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档