前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >20.multi_case07

20.multi_case07

作者头像
hankleo
发布2020-09-17 11:07:21
4010
发布2020-09-17 11:07:21
举报
文章被收录于专栏:Hank’s BlogHank’s Blog
代码语言:javascript
复制
# coding:utf-8

import re
import ssl
import csv
import json
import time
import random
import asyncio
import aiohttp
import requests
from lxml import etree
from asyncio.queues import Queue
from aiosocksy import Socks5Auth
from aiosocksy.connector import ProxyConnector, ProxyClientRequest


class Common():
    task_queue = Queue()
    result_queue = Queue()
    market_cap_all = 0
    currency_rate = 0


# 线上内网
socks5_address_prod = [
    'socks5://10.1.100.253:1235',
    'socks5://10.1.100.51:1235',
    'socks5://10.1.100.70:1235',
    'socks5://10.1.100.205:1235',
    'socks5://10.1.100.73:1235'
]

# 办公网
socks5_address_dev = [
    'socks5://18.208.81.123:1235',
    'socks5://34.197.217.25:1235',
    'socks5://52.20.255.43:1235',
    'socks5://34.237.163.87:1235',
    'socks5://18.208.81.123:1235',
    'socks5://52.0.114.155:1235'
]

DEPLOY_MODE = "dev"


async def session_get(session, url, socks):
    auth = Socks5Auth(login='...', password='...')
    headers = {'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
    timeout = aiohttp.ClientTimeout(total=20)
    response = await session.get(
        url,
        proxy=socks,
        proxy_auth=auth,
        timeout=timeout,
        headers=headers,
        ssl=ssl.SSLContext()
    )
    return await response.text(), response.status


async def download(url):
    connector = ProxyConnector()
    if DEPLOY_MODE == "dev":
        socks = None
    elif DEPLOY_MODE == "Prod":
        socks = random.choice(socks5_address_prod)
    async with aiohttp.ClientSession(
            connector=connector,
            request_class=ProxyClientRequest
    ) as session:
        ret, status = await session_get(session, url, socks)
        if 'window.kk' in ret and len(ret) < 1000:
            url = ret.split("window.kk='")[1].split("'")[0]
            ret, status = await session_get(session, url, socks)
        return ret, status


async def parse_html(cid, url, response):
    coin_info = {}
    coin_value = {}

    coin_info['url'] = url
    coin_info['cid'] = cid
    coin_info['time'] = int(time.time())

    tree = etree.HTML(response)

    try:
        price_usd = tree.xpath(
            '//div[@class="priceInfo"]/div[@class="sub"]/span[1]/text()'
        )[0].strip().replace('$', '')
        if '?' not in price_usd:
            coin_value['price'] = float(price_usd)
    except BaseException:
        pass

    try:
        updown = tree.xpath(
            '//div[@class="priceInfo"]/div[@class="sub smallfont"]/span[1]/text()'
        )[0].strip().replace('%', '')
        coin_value['updown'] = float(updown)
    except BaseException:
        pass

    try:
        volume_24h_rmb = tree.xpath(
            '//div[@class="info"]/div[@class="charCell"][2]/div[2]/span/text()'
        )[0].strip().replace('¥', '').replace(',', '')
        coin_value['volume_24h'] = int(
            float(volume_24h_rmb) / Common.currency_rate)
    except BaseException:
        pass

    try:
        circulating_supply = tree.xpath(
            '//div[@class="info"]//div[@class="charCell"][1]/div[@class="val"]/text()'
        )[0].strip().replace(',', '')
        if '?' not in circulating_supply:
            circulating_supply = re.match(
                r'^(\d+)(\w+)$', circulating_supply).group(1)
            coin_value['circulating_supply'] = int(circulating_supply)
    except BaseException:
        pass

    try:
        if coin_value['price'] and coin_value['circulating_supply']:
            market_cap = coin_value['price'] * coin_value['circulating_supply']
            coin_value['market_cap'] = market_cap
    except BaseException:
        pass

    try:
        if coin_value['market_cap']:
            global_share = coin_value['market_cap'] / Common.market_cap_all
            if global_share < 0.001:
                coin_value['global_share'] = '<0.1%'
            else:
                coin_value['global_share'] = str(
                    (global_share * 100).__round__(2)) + '%'
    except BaseException:
        pass

    try:
        circulation_rate = tree.xpath(
            '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
        )[0].strip()
        if '?' not in circulation_rate:
            coin_value['circulation_rate'] = circulation_rate
    except BaseException:
        pass

    try:
        turnover_rate = tree.xpath(
            '//div[@class="info"]//div[@class="charbox"][1]/div[@class="val"]/text()'
        )[1].strip()
        if '?' not in turnover_rate:
            coin_value['turnover_rate'] = turnover_rate
    except BaseException:
        pass

    try:
        issue_time = tree.xpath(
            '//div[@class="infoList"]/div[1]/div[1]/span[2]/text()'
        )[0].strip()
        if issue_time != '-':
            coin_value['issue_time'] = issue_time
    except BaseException:
        pass

    try:
        exchange_num = tree.xpath(
            '//div[@class="infoList"]/div[3]/div[1]/span[2]/text()'
        )[0].strip().replace('家', '')
        coin_value['exchange_num'] = int(exchange_num)
    except BaseException:
        pass

    try:
        total_circulation = tree.xpath(
            '//div[@class="infoList"]/div[2]/div[2]/span[2]/text()'
        )[0].strip().replace(',', '')
        coin_value['total_circulation'] = int(total_circulation)
    except BaseException:
        pass

    coin_info['value'] = coin_value
    return coin_info


async def down_and_parse_task(queue):
    while True:
        try:
            cid, url = queue.get_nowait()[:2]
        except BaseException:
            return
        for retry_cnt in range(3):
            try:
                html, status = await download(url)
                if status != 200:
                    html, status = await download(url)
                if '访问控制拒绝了你的请求' in html:
                    html, status = await download(url)
                html_parse_result = await parse_html(cid, url, html)
                print(html_parse_result)
                await Common.result_queue.put(html_parse_result)
                break
            except BaseException:
                await asyncio.sleep(0.2)
                continue


async def push(data):
    url = 'http://127.0.0.1:8000/aaa'
    error = None
    for retry_cnt in range(3):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                        url,
                        data=json.dumps(data)
                ) as response:
                    pass
                response.raise_for_status()
        except Exception as e:
            await asyncio.sleep(0.2)
            print(e)


async def speed_monitor():
    while Common.task_queue.qsize() != 0:
        old_queue_len = Common.task_queue.qsize()
        await asyncio.sleep(5)
        new_queue_count = Common.task_queue.qsize()
        print('=================')
        print('speed = ', (old_queue_len - new_queue_count) / 5)


async def monitor_finish():
    while len(asyncio.Task.all_tasks()) > 3:
        await asyncio.sleep(1)
    await asyncio.sleep(5)
    raise SystemExit()


async def push_results():
    temp_q = []
    while True:
        try:
            await asyncio.sleep(3)
            for _ in range(Common.result_queue.qsize()):
                temp_q.append(await Common.result_queue.get())
            if len(temp_q) > 0:
                await push(temp_q)
                temp_q.clear()
        except BaseException:
            import traceback
            print(traceback.format_exc())


async def get_marketcap():
    url = 'https://dncapi.feixiaohao.com/api/home/global?webp=0'
    response = requests.get(url)
    response_json = json.loads(response.text)
    marketcap = response_json['data']['marketcapvol']
    Common.market_cap_all = int(marketcap)


async def get_currency_rate():
    url_rate = 'https://dncapi.feixiaohao.com/api/coin/web-rate/'
    response = requests.get(url_rate)
    currency_rate = json.loads(response.text)[11]['cny']
    Common.currency_rate = currency_rate

# 300秒抓取时间上限
async def time_limit():
    await asyncio.sleep(280)
    raise SystemExit()


async def main():
    # loop = asyncio.get_event_loop()
    csv_reader = csv.reader(
        open(
            'feixiaohao_mapping_data.csv',
            encoding='utf-8'))
    for row in csv_reader:
        try:
            if row[1].startswith('https'):
                await Common.task_queue.put(row)
        except BaseException:
            pass
    print(Common.task_queue)

    await get_marketcap()
    print('总市值', Common.market_cap_all)

    await get_currency_rate()
    print('汇率', Common.currency_rate)

    for _ in range(10):
        loop.create_task(down_and_parse_task(Common.task_queue))
        loop.create_task(monitor_finish())
        loop.create_task(speed_monitor())
        loop.create_task(push_results())
        loop.create_task(time_limit())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.create_task(main())
    loop.run_forever()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档