专栏首页量化投资与机器学习量化A股舆情:基于Kafka+Faust的实时新闻流解析

量化A股舆情:基于Kafka+Faust的实时新闻流解析

实时新闻流数据

新闻消息瞬息万变,新闻舆情也对股票市场产生了明显的影响,实时新闻流数据能够为量化交易带来更多的应用场景,比如盘中的风险监控、实时的情绪及热度统计、事件驱动交易等。

ChinaScope近期上线了基于Kafka的实时新闻流数据——SmarTag Stream,公众号第一时间申请到了试用权限,接下来,大家跟着编辑部一起,一路从kafka的消息流,到基于流处理框架Faust实现的股票实时监控,来探索一下新闻流数据在量化场景的潜在应用。

首先简单介绍一下新闻数据的结构,SmarTag对每天新闻进行结构化处理,首先会提取新闻中的标签,其次会对新闻及新闻中的公司人物的进行情绪分析,最终会以Json格式的推送处理完的新闻结构化数据,该数据中有三个属性:

  • newsInfo:新闻基础信息
  • emotionInfos:情绪信息
  • newsTags:标签信息

各属性内的比较重要(并非全部)的字段如下:

Kafka消息流的几个核心概念

  • Producer:消息的生产者
  • Broker:Broker是Kafka的实例,每个服务器有一个或多个实例。
  • Consumer:消息的消费者,Kafka把新闻流从服务端推送到客户端,从而使我们消费(或处理)这个消息
  • Topic题:消息的主题,可以理解为消息的分类,客户端通过订阅Topic,接收对应Topic的消息。
  • Partition:Topic的分区,主要作用是负载均衡,作为消息的消费者,无需过多的关注Partition,这只是一个物理上的概念,对最终接收的消息没有任何影响。
  • Group:用于对于消费者进行分组。

作为技术小白,我们只需要理解,kafka是用来从服务端到客户端推送消息的。消息的生产者Producer产生消息后,放入对应Topic的消息队列,Broker会对这些消息进行分发推送,客户端的消费者Consumer(需订阅该Topic)接收到消息后进行处理及应用。

用Python接收新闻流数据

对于技术小白们来说,Kafka的运行机制并不需要详细的了解,只需要关心,怎么才能接收到数据?以及接收到数据之后,该怎么处理?

我们首先来解答第一个问题,怎么接受数据?我们以小白标配语言Python为例,Python里有好几个kafka的工具包,包括python-kafka, aiokafka等,我们这里以python-kafka为例。安装很简单:

pip install python-kafka

安装完成之后,我们导入KafkaConsumer,并进行相关配置,主要配置项包括Topic、Group、Broker及登录用户名密码:

# encoding utf-8
from kafka import KafkaConsumer # 注意这里是kafka,不是python-kafka
import json

def get_news_stream(start_from_newest=True):
    """
    :param start_from_newest: False, 从上次结束消费的地方接收信息,True, 从最新的消息开始接收
    :return:
    """
    consumer = KafkaConsumer('TOPIC_NAME',
                             group_id='GROUP_ID',
                             enable_auto_commit=True,
                             auto_commit_interval_ms=1000,
                             auto_offset_reset='latest',
                             security_protocol='SASL_PLAINTEXT',
                             sasl_mechanism="SCRAM-SHA-512",
                             sasl_plain_username="USERNAME",
                             sasl_plain_password="PASSWORD",
                             bootstrap_servers=['broker1.xxxxxx.com:19092',
 'broker2.xxxxxx.com:19092']
                             )
    if start_from_newest:
        consumer.poll(10)
        consumer.seek_to_end()
    for msg in consumer:
        print(json.loads(msg.value)) # 可以加入对于新闻处理的逻辑,这里只是简单Print

if __name__ == '__main__':
    get_news_stream()

关于以上示例代码有几点需要说明,首先是几个参数的意义:

  • enable_auto_commit = True, 开启自动确认,向服务端确认以收到消息,Consumer在接收到消息后,会跟服务端确认收到消息,如果此时断开,下一次会从断开的地方重新接收消息。
  • auto_commit_interval_ms=1000, 自动提交的间隔,每1秒确认一次
  • auto_offset_reset='lastest', 是从上次断开的地方接收消息,如果是'earliest', 则从最早的消息开始接收。
  • bootstrap_servers: 填入的是broker服务器的地址
  • 其他参数为登录验证的设置

还有几个细节需要说明:

  • 因为新闻流在不断的推送,如果中途断开再重新接入后,面临两种选择:
    • 从断的地方开始消费,这种情况多见于数据落地,需要中间不丢包,保证数据完成
    • 从最新的消息开始消费,这种情况多见于需要使用最新数据计算交易信号等场景,在上面的例子中,我们把start_from_newest设置为True后,每次断开,都从最新的新闻流开始消费。比如,上次断开是14:20,重新连上后是14:30,那么最新接到的消息就是14:30之后的消息。
    • KafkaConsumer有这样一个函数支持直接跳到最后,就是seek_to_end,但直接使用会 报错,在之前还需要poll一次数据。
  • 代码中的for循环用于不断的接收消息,然后处理,由于消息以二进制的形式接收过来,所以需要进行序列化,比如这里原消息是Json格式的,这里就使用json.loads把字符串转为dict。接下来,就是将数据存到数据库,或者进行相应的计算。比如计算实时的股票情绪或市场情绪等。

大家也注意到,这里收到消息进行处理完之后,才能消费下一条消息,就容易造成队列的阻塞。如果是简单的处理逻辑,倒不至于阻塞。但如果处理逻辑过于复杂,那下一条消息就必须等待上一条消息处理完成后才能被消费。这种情况下,我们需要一个异步处理的流程,也就是消息的接收和处理分为不同的进程,相互独立。

关于流处理

流是一种数据传送技术,它把客户端数据变成一个稳定的流。正是由于数据传送呈现连续不停的形态,所以流引擎需要连续不断处理数据。比如实时高频的股票行情数据就可以看成是一个数据流,基于实时高频数据产生交易信号的过程就可以看做是一个流处理的过程。

与流处理对应的数据处理方式是批处理,传统的数据融合通常基于批模式。在批的模式下,我们会通过一些周期性运行的ETL JOB,将数据从关系型数据库、文件存储向下游的目标数据库进行同步,中间可能有各种类型的转换。

批处理一般是解决离线计算数据量大,计算时间慢的问题,流处理相反是为了解决实时计算或是近实时计算问题,当然有了实时的要求就会使处理的数据量变少,但是计算速度要求更快,两者相同点都要求数据计算的准确性有保障。

常见的流处理框架包括Kafka Streams、Apache Storm、Spark Stream、Samza及大名鼎鼎的Apache Flink,成熟的流处理框架在容错性、状态管理及性能上都有很大的保障。小编也看到很多量化私募开始招聘掌握Flink的技术大咖。当然,这些成熟的大框架不是小编今天的目标,只会用Python的小编当然搞不懂这些。

所以问题就来了,了解了流处理之后,Python中有没有好用的流处理框架,而且是支持Kafka的?当然有,这就是Faust~

基于Faust的新闻流处理

Faust是一个轻量的流处理框架,非常适合小型量化团队对于流处理的需求。Faust是一个将Kafka Streams的概念移植到Python的第三方库,安装Faust时需要注意安装的是faust-streaming,而不是faust,使用以下代码安装:

pip install faust-streaming # 注意不是pip install faust

接着我们通过一段简单的示例代码来了解如何通过Faust接入实时新闻流:

# news_stream.py
import faust

# 定义App
app = faust.App('GROUP_ID',
                value_serializer='json',
                topic_allow_declare=False,
                topic_disable_leader=True,
                broker=['kafka://broker1.xxxxxx.com:19092',
                        'kafka://broker2.xxxxxx.com:19092'],
                broker_credentials=faust.SASLCredentials(
                    username='USERNAME',
                    password='PASSWORD',
                    mechanism='SCRAM-SHA-512')
               )
               

# 订阅Topic
news_kafka_topic = app.topic('TOPIC_NAME')

# 针对Topic的处理函数
@app.agent(news_kafka_topic)
async def process(stream) -> None:
    async for event in news_items.events():
        print('News:{}'.format(event.message.timestamp))
        print(event.value)

if __name__ == '__main__':
    app.main()

首先实例化一个Faust流处理的应用程序,faust.App(),其中相关参数解释如下:

  • 位置参数'GROUP_ID'
  • value_serializer: 序列化工具,在python-kafka中,我们需要自己用json进行序列化,在这里只需要在参数中设定好,框架会自动将消息中的vlaue进行序列化处理。
  • topic_allow_declare:如果只是单纯的消费消息,没有创建topic的权限的话,必须设置为False
  • topic_disable_leader:Faust默认会在订阅其他Topic时创建一个leader topic, 如果只是单纯的消费消息,没有创建topic的权限的话,必须设置为True
  • boker: 必须带上“kafka://”的前缀
  • broker_credentials:登录信息设置

然后实例化Topic,news_kafka_topic = app.topic('TOPIC_NAME')。其中TOPIC_NAME为订阅的topic,必须与kafka消息流的Topic名称保持一致。

Faust代理是一个流处理器,它订阅一个主题并处理每条消息。在Faust中,代理(Agent)用于装饰异步函数,可以并行处理无限数据流。该代理用作您的处理函数的装饰器,异步函数必须使用异步for循环遍历数据流。对于同一个Topic,可以同时有多个Agent对其进行消息处理。

然后通过命令行,切换到当前py文件的目录,启动app,就可以实时接收并异步处理新闻数据了:

faust -A news_stream woker -l info #news_stream

‍自选股负面舆情监控

对于关注的股票列表进行舆情监控,如果出现显著负面的新闻,则发送邮件。具体的示例说明如下:

  • 关注列表:宁德时代、比亚迪、中国联通、科大讯飞
  • 提醒条件:新闻相关度大于0.5,且负面情绪值大于0.7
  • 时间范围:全天实时监控
  • 提醒方式:邮件发送
# encoding=utf-8
from abc import ABC
import faust
import smtplib
from email.mime.text import MIMEText

class NewsItem(faust.Record, ABC, serializer='json'):
    newsInfo: dict
    newsTags: list
    emotionInfos: list

def send_mail(content):
    # 邮件构建
    subject = "负面新闻预警"  # 邮件标题
    sender = "*********@163.com"  # 发送方
    recver = "*******@163.com"  # 接收方
    password = "*****"
    message = MIMEText(content, "plain", "utf-8")
    # content 发送内容 "plain"文本格式 utf-8 编码格式
    message['Subject'] = subject # 邮件标题
    message['To'] = recver # 收件人
    message['From'] = sender # 发件人
    smtp = smtplib.SMTP_SSL("smtp.163.com", 994) # 实例化smtp服务器
    smtp.login(sender, password) # 发件人登录
    smtp.sendmail(sender, [recver], message.as_string()) # as_string 对 message 的消息进行了封装
    smtp.close()

def news_alert(news_time, news_value):
    sec_codes = ['300750_SZ_EQ', '002594_SZ_EQ', '600050_SH_EQ', '002230_SZ_EQ']
    news_info = news_value.newsInfo
    tag_info = news_value.newsTags
    emotion_info = news_value.emotionInfos
    for tag in tag_info:
        if tag['entityCode'] in sec_codes and tag['itemRelevance']>=0.5:
            for emotion in emotion_info:
                if emotion['entityCode'] == tag['entityCode'] and emotion['emotionIndicator']==2 and emotion['emotionWeight']>=0.8:
                    content = "{}【负面预警】:{}-{} \n 【新闻摘要】{}".format(news_time, tag['entityCode'], emotion['emotionWeight'], news_info['newsSummary'])
    send_mail(content)

app = faust.App('GROUP_NAME',
                value_serializer='json',
                topic_allow_declare=False,
                topic_disable_leader=True,
                broker=['kafka://xxxx.com:19092',
                        'kafka://yyyy.com:19092'],
                broker_credentials=faust.SASLCredentials(
                    username='USERNAME',
                    password='PASSWORD',
                    mechanism='SCRAM-SHA-512')
                )
news_kafka_topic = app.topic('TOPIC_NAME', value_type=NewsItem)

@app.agent(news_kafka_topic)
async def process(news) -> None:
    async for news_item in news.events():
        news_alert(news_item.message.timestamp, news_item.value)

if __name__ == '__main__':
    app.main()

运行以上代码,会实时监控关注股票的负面新闻,并发送邮件通知,相关代码说明如下:

  • 类NewsItem是对消息流中的新闻数据构建了一个数据结构模型,这样能够更方便的访问数据内的字段,然后要在topic内声明value_type=NewsItem。
  • news_alert函数完成了数据的分析,首先检查了新闻关联的股票,如果有关注的股票,则再检查其情绪是否是负面且大于0.7
  • 如果发现负面新闻,则通过send_mail函数发送邮件。

总结

Faust流处理框架极大的方便了新闻流数据处理的效率,不用在担心各种细节。

Faust还有各种丰富的功能,包括数据中间表Tables、数据持续化、过滤器Filters等。基于这些功能,我们可以很快速的实现,如全市场股票情绪的监控,实时个股情绪因子及市场情绪指数的计算

最后,公众号认为,无论是大机构还是小机构,积极拥抱不同的IT技术,以工程式的思维对待量化策略研究与开发一定是一条可持续的道路。加油!

有需要试用新闻流的小伙伴,请点击https://www.chinascope.com/tryout.html

本文分享自微信公众号 - 量化投资与机器学习(Lhtz_Jqxx),作者:全网Quant都在看

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-07-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python流处理Python

    它被用于Robinhood去构建高性能的分布式系统和实时数据通道,每天处理数十亿的数据。

    py3study
  • 厉害了!新闻情绪因子

    2、智能标签识别:识别新闻中存在的法人及自然人实体、SAM产品、行业、事件及概念。除了识别出这些标签,算法还能给出这篇新闻与这些标签的相关程度。

    量化投资与机器学习微信公众号
  • 自然语言处理在金融实时事件监测和财务快讯中的应用

    疫情之下,全球金融市场进入大波动时代,各国金融调控政策、突发事件层出不穷,例如美联储无限量QE、欧央行7500亿复苏基金、中美关闭使领馆、阿塞拜疆和亚美尼亚爆发...

    zenRRan
  • 用Python搭建一个股票舆情分析系统

    下面的这篇文章将手把手教大家搭建一个简单的股票舆情分析系统,其中将先通过金融界网站爬取指定股票在一段时间的新闻,然后通过百度情感分析接口,用于评估指定股票的正面...

    Python编程与实战
  • 另类数据:不会用不代表不好用!

    管理着1040亿美元的对冲基金Man Group的数据科学主管Hinesh Kalian表示:“在过去6个月,我们对另类数据的需求飙升,越来越多的另类数据提供商...

    量化投资与机器学习微信公众号
  • 大数据24小时 | Q9 Networks易主,特斯拉与“老搭档”分道扬镳,下一秒发生什么谁都说不好!

    <数据猿导读> 前一秒,特斯拉与MobileEye还是兄弟情深,结果现已分道扬镳。现如今,各行各业都随时面临着更新迭代,火热的大数据领域更为尤甚,其变化之快,着...

    数据猿
  • 【贝叶斯系列】在研究机构如何应用贝叶方法论进行量化投资

    贝叶斯方法与量化投资 贝叶斯方法在量化投资中有哪些应用股票分类 市场趋势识别 波动率估计 投资组合风险 股票分类 构造投资组合的方法是买入好的 股票(未来收...

    量化投资与机器学习微信公众号
  • 大数据周周看 | 行业大牛不甘平淡忙创业,戴尔天价收购背后竟是数千人的失业判决书

    <数据猿导读> Dell公司宣布裁员至少两千人;紫光股份与西部数据拟出资10亿元建立大数据公司;原中国移动研究院专家王帅宇加盟北京供销大数据集团,出任CTO一职...

    数据猿
  • 如何为智能投顾打造对话系统?这有一份指南可供参考

    主讲人:灵智优诺CTO 许可 屈鑫 颜萌 整理编辑 量子位 出品 | 公众号 QbitAI 作为人工智能和语言学的重要分支,自然语言处理(NLP)的相关研究一直...

    量子位
  • 数据为谁而用?——人性化交互金融知识图谱问答探索

    《经济学人》五月第一期杂志中,头条新闻中将数据比作世界上最有价值的资源。早在2014年马云董事长就说过阿里巴巴从本质上来讲已经成为一家数据公司。文章[4]中说现...

    华章科技
  • 从两会看中国大数据的应用

    大数据正在加速落地。中国政府出台大数据刺激计划只是时间早晚问题,企业家正在通过各种渠道去影响政府,希望其尽快承担起大数据开放和利用的牵头职责。 大数据从民到官...

    罗超频道
  • 【金猿案例展】某基金管理公司:智能量化投资平台建设

    本案例由九章云极投递并参与评选,数据猿独家全网首发;更多关于【金猿榜/奖·2019征集评选】的相关信息,请点击这里了解详情丨征案例、征文章、征产品=评企业、评人...

    数据猿
  • 『金融数据结构』「1. 数据类型」

    声明:这个 AFML 系列终于开始硬核将机器学习和量化投资结合在一起了,而且 Python 系列也派上用场了。

    用户5753894
  • 无敌了!新闻情绪因子进阶来啦!

    我们详细分析对比了采用不同情绪得分计算方法的因子表现。从而得出一个很重要且结论:即情绪因子构建时应该考虑新闻与股票的相关度即情绪得分的时间衰减。基于以上的结论,...

    量化投资与机器学习微信公众号
  • 社交数据平台DataSift(服务领域:数据即服务)

    社交数据平台DataSift帮助“开发商以及第三方”访问Twitter,Facebook及其它社交数据资源,Datasift能够对海量社交数据进行分析,向品牌公...

    华章科技
  • 干货 | 舆论事件频发 大数据如何引导网络舆情

    维克托·迈尔·舍恩伯格在其著作《大数据时代》中提到,大数据带来的信息风暴正在变革我们的生活、工作和思维,大数据开启了一次重大的时代转型。 奥巴马政府将大数据定...

    灯塔大数据
  • 重磅来袭 | AFML系列开启,我们相信会成为经典~

    AFML系列 终于开始硬核将机器学习和量化投资结合在一起了,而且 Python 系列也派上用场了。

    量化投资与机器学习微信公众号
  • JPMorgan最新报告解读:基于NLP的A股交易策略(附下载)

    今天,公众号刚刚通过ChinaScope(数库)拿到了J.P. Morgan(摩根大通)关于中国A股市场的量化研究报告:

    量化投资与机器学习微信公众号
  • 从T+1到T+0,浅谈PetaBase的实时流式处理

    随着互联网+的进一步发展,各行业对大数据技术的应用日趋成熟,企业的信息化范围正在高速扩展。

    数据狗忙忙忙

扫码关注云+社区

领取腾讯云代金券