实时新闻流数据
新闻消息瞬息万变,新闻舆情也对股票市场产生了明显的影响,实时新闻流数据能够为量化交易带来更多的应用场景,比如盘中的风险监控、实时的情绪及热度统计、事件驱动交易等。
ChinaScope近期上线了基于Kafka的实时新闻流数据——SmarTag Stream,公众号第一时间申请到了试用权限,接下来,大家跟着编辑部一起,一路从kafka的消息流,到基于流处理框架Faust实现的股票实时监控,来探索一下新闻流数据在量化场景的潜在应用。
首先简单介绍一下新闻数据的结构,SmarTag对每天新闻进行结构化处理,首先会提取新闻中的标签,其次会对新闻及新闻中的公司人物的进行情绪分析,最终会以Json格式的推送处理完的新闻结构化数据,该数据中有三个属性:
各属性内的比较重要(并非全部)的字段如下:
Kafka消息流的几个核心概念
作为技术小白,我们只需要理解,kafka是用来从服务端到客户端推送消息的。消息的生产者Producer产生消息后,放入对应Topic的消息队列,Broker会对这些消息进行分发推送,客户端的消费者Consumer(需订阅该Topic)接收到消息后进行处理及应用。
用Python接收新闻流数据
对于技术小白们来说,Kafka的运行机制并不需要详细的了解,只需要关心,怎么才能接收到数据?以及接收到数据之后,该怎么处理?
我们首先来解答第一个问题,怎么接受数据?我们以小白标配语言Python为例,Python里有好几个kafka的工具包,包括python-kafka, aiokafka等,我们这里以python-kafka为例。安装很简单:
pip install python-kafka
安装完成之后,我们导入KafkaConsumer,并进行相关配置,主要配置项包括Topic、Group、Broker及登录用户名密码:
# encoding utf-8
from kafka import KafkaConsumer # 注意这里是kafka,不是python-kafka
import json
def get_news_stream(start_from_newest=True):
"""
:param start_from_newest: False, 从上次结束消费的地方接收信息,True, 从最新的消息开始接收
:return:
"""
consumer = KafkaConsumer('TOPIC_NAME',
group_id='GROUP_ID',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
auto_offset_reset='latest',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="USERNAME",
sasl_plain_password="PASSWORD",
bootstrap_servers=['broker1.xxxxxx.com:19092',
'broker2.xxxxxx.com:19092']
)
if start_from_newest:
consumer.poll(10)
consumer.seek_to_end()
for msg in consumer:
print(json.loads(msg.value)) # 可以加入对于新闻处理的逻辑,这里只是简单Print
if __name__ == '__main__':
get_news_stream()
关于以上示例代码有几点需要说明,首先是几个参数的意义:
还有几个细节需要说明:
大家也注意到,这里收到消息进行处理完之后,才能消费下一条消息,就容易造成队列的阻塞。如果是简单的处理逻辑,倒不至于阻塞。但如果处理逻辑过于复杂,那下一条消息就必须等待上一条消息处理完成后才能被消费。这种情况下,我们需要一个异步处理的流程,也就是消息的接收和处理分为不同的进程,相互独立。
关于流处理
流是一种数据传送技术,它把客户端数据变成一个稳定的流。正是由于数据传送呈现连续不停的形态,所以流引擎需要连续不断处理数据。比如实时高频的股票行情数据就可以看成是一个数据流,基于实时高频数据产生交易信号的过程就可以看做是一个流处理的过程。
与流处理对应的数据处理方式是批处理,传统的数据融合通常基于批模式。在批的模式下,我们会通过一些周期性运行的ETL JOB,将数据从关系型数据库、文件存储向下游的目标数据库进行同步,中间可能有各种类型的转换。
批处理一般是解决离线计算数据量大,计算时间慢的问题,流处理相反是为了解决实时计算或是近实时计算问题,当然有了实时的要求就会使处理的数据量变少,但是计算速度要求更快,两者相同点都要求数据计算的准确性有保障。
常见的流处理框架包括Kafka Streams、Apache Storm、Spark Stream、Samza及大名鼎鼎的Apache Flink,成熟的流处理框架在容错性、状态管理及性能上都有很大的保障。小编也看到很多量化私募开始招聘掌握Flink的技术大咖。当然,这些成熟的大框架不是小编今天的目标,只会用Python的小编当然搞不懂这些。
所以问题就来了,了解了流处理之后,Python中有没有好用的流处理框架,而且是支持Kafka的?当然有,这就是Faust~
基于Faust的新闻流处理
Faust是一个轻量的流处理框架,非常适合小型量化团队对于流处理的需求。Faust是一个将Kafka Streams的概念移植到Python的第三方库,安装Faust时需要注意安装的是faust-streaming,而不是faust,使用以下代码安装:
pip install faust-streaming # 注意不是pip install faust
接着我们通过一段简单的示例代码来了解如何通过Faust接入实时新闻流:
# news_stream.py
import faust
# 定义App
app = faust.App('GROUP_ID',
value_serializer='json',
topic_allow_declare=False,
topic_disable_leader=True,
broker=['kafka://broker1.xxxxxx.com:19092',
'kafka://broker2.xxxxxx.com:19092'],
broker_credentials=faust.SASLCredentials(
username='USERNAME',
password='PASSWORD',
mechanism='SCRAM-SHA-512')
)
# 订阅Topic
news_kafka_topic = app.topic('TOPIC_NAME')
# 针对Topic的处理函数
@app.agent(news_kafka_topic)
async def process(stream) -> None:
async for event in news_items.events():
print('News:{}'.format(event.message.timestamp))
print(event.value)
if __name__ == '__main__':
app.main()
首先实例化一个Faust流处理的应用程序,faust.App(),其中相关参数解释如下:
然后实例化Topic,news_kafka_topic = app.topic('TOPIC_NAME')。其中TOPIC_NAME为订阅的topic,必须与kafka消息流的Topic名称保持一致。
Faust代理是一个流处理器,它订阅一个主题并处理每条消息。在Faust中,代理(Agent)用于装饰异步函数,可以并行处理无限数据流。该代理用作您的处理函数的装饰器,异步函数必须使用异步for循环遍历数据流。对于同一个Topic,可以同时有多个Agent对其进行消息处理。
然后通过命令行,切换到当前py文件的目录,启动app,就可以实时接收并异步处理新闻数据了:
faust -A news_stream woker -l info #news_stream
自选股负面舆情监控
对于关注的股票列表进行舆情监控,如果出现显著负面的新闻,则发送邮件。具体的示例说明如下:
# encoding=utf-8
from abc import ABC
import faust
import smtplib
from email.mime.text import MIMEText
class NewsItem(faust.Record, ABC, serializer='json'):
newsInfo: dict
newsTags: list
emotionInfos: list
def send_mail(content):
# 邮件构建
subject = "负面新闻预警" # 邮件标题
sender = "*********@163.com" # 发送方
recver = "*******@163.com" # 接收方
password = "*****"
message = MIMEText(content, "plain", "utf-8")
# content 发送内容 "plain"文本格式 utf-8 编码格式
message['Subject'] = subject # 邮件标题
message['To'] = recver # 收件人
message['From'] = sender # 发件人
smtp = smtplib.SMTP_SSL("smtp.163.com", 994) # 实例化smtp服务器
smtp.login(sender, password) # 发件人登录
smtp.sendmail(sender, [recver], message.as_string()) # as_string 对 message 的消息进行了封装
smtp.close()
def news_alert(news_time, news_value):
sec_codes = ['300750_SZ_EQ', '002594_SZ_EQ', '600050_SH_EQ', '002230_SZ_EQ']
news_info = news_value.newsInfo
tag_info = news_value.newsTags
emotion_info = news_value.emotionInfos
for tag in tag_info:
if tag['entityCode'] in sec_codes and tag['itemRelevance']>=0.5:
for emotion in emotion_info:
if emotion['entityCode'] == tag['entityCode'] and emotion['emotionIndicator']==2 and emotion['emotionWeight']>=0.8:
content = "{}【负面预警】:{}-{} \n 【新闻摘要】{}".format(news_time, tag['entityCode'], emotion['emotionWeight'], news_info['newsSummary'])
send_mail(content)
app = faust.App('GROUP_NAME',
value_serializer='json',
topic_allow_declare=False,
topic_disable_leader=True,
broker=['kafka://xxxx.com:19092',
'kafka://yyyy.com:19092'],
broker_credentials=faust.SASLCredentials(
username='USERNAME',
password='PASSWORD',
mechanism='SCRAM-SHA-512')
)
news_kafka_topic = app.topic('TOPIC_NAME', value_type=NewsItem)
@app.agent(news_kafka_topic)
async def process(news) -> None:
async for news_item in news.events():
news_alert(news_item.message.timestamp, news_item.value)
if __name__ == '__main__':
app.main()
运行以上代码,会实时监控关注股票的负面新闻,并发送邮件通知,相关代码说明如下:
总结
Faust流处理框架极大的方便了新闻流数据处理的效率,不用在担心各种细节。
Faust还有各种丰富的功能,包括数据中间表Tables、数据持续化、过滤器Filters等。基于这些功能,我们可以很快速的实现,如全市场股票情绪的监控,实时个股情绪因子及市场情绪指数的计算。
最后,公众号认为,无论是大机构还是小机构,积极拥抱不同的IT技术,以工程式的思维对待量化策略研究与开发一定是一条可持续的道路。加油!
有需要试用新闻流的小伙伴,请点击https://www.chinascope.com/tryout.html