首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >用Python一键量化市场情绪:揭秘任意时刻的大盘涨跌密码

用Python一键量化市场情绪:揭秘任意时刻的大盘涨跌密码

作者头像
子晓聊技术
发布2026-04-23 16:48:18
发布2026-04-23 16:48:18
1490
举报
文章被收录于专栏:子晓AI量化子晓AI量化

其实源代码很简单, 通过同花顺问财就可以获取,可能一些同学没经验。 那这篇文章就分享下技术思路。 文末见源代码。

我上面的截图是上周一 中美MY战当天的涨跌幅情况, 大家可以看到, 9点25分的时候只有117家涨停, 情绪非常恐慌, 如果当时买入是不是胜率很高。 有点马后炮了。

股市瞬息万变,每一分钟都蕴含着宝贵的信息。作为投资者,你是否曾想过:

  • 开盘集合竞价那一刻,市场情绪是乐观还是悲观?
  • 收盘前最后几分钟,哪些板块在悄然发力?
  • 某个特定时间点,涨停和跌停的股票都集中在哪些概念?

这些问题看似简单,但要找到答案却需要繁琐的数据整理和分析工作。写这个工具的目的, 就是让它能辅助自己一键获取任意时间点的市场全景,深度解析涨跌背后的板块密码。

传统行情软件往往只能查看最新数据或前一天的收盘数据。通过简洁直观的界面,我们可以选择任意历史日期,并精确到分钟(如09:25集合竞价、11:30早盘收盘、14:57尾盘等),系统会自动抓取并分析该时间点的全市场数据。

技术栈选择:

  • Streamlit:用于快速构建交互式Web界面,让数据分析变得直观易用
  • pywencai:接入问财数据接口,获取实时、准确的金融数据
  • Pandas:进行高效的数据清洗、处理和分析
  • Plotly:创建专业、交互式的数据可视化图表
  • 正则表达式:智能识别和匹配数据字段,提高代码的健壮性

这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助

代码语言:javascript
复制
import streamlit as st
import pywencai
import pandas as pd
import re
from datetime import datetime, time
import plotly.express as px
import plotly.graph_objects as go
def get_market_change_data(target_date, target_time):
    """
    统计指定日期和时分的大盘涨跌幅情况
    target_date: 日期字符串,格式如 '20251014'
    target_time: 时间字符串,格式如 '09:25'
    """
    try:
        # 构建查询语句,获取指定时间的大盘数据
        query = f"{target_date} {target_time} 大盘涨跌幅,所属概念"
        # 获取数据
        df = pywencai.get(query=query, loop=True)
        if df is None or df.empty:
            return None
        # 模糊匹配涨跌幅字段
        change_columns = []
        patterns = [
            r'.*涨跌幅.*',
            r'.*涨幅.*',
            r'.*跌幅.*',
            r'.*涨跌.*',
            r'.*幅度.*'
        ]
        for pattern in patterns:
            matched_cols = [col for col in df.columns if re.search(pattern, col, re.IGNORECASE)]
            change_columns.extend(matched_cols)
        # 去重
        change_columns = list(set(change_columns))
        if not change_columns:
            return None
        # 选择最相关的涨跌幅列
        selected_column = None
        for col in change_columns:
            if '分时' in col or target_time.replace(':', '') in col or target_time in col:
                selected_column = col
                break
        if not selected_column and change_columns:
            selected_column = change_columns[0]
        # 数据清洗和统计
        df[selected_column] = pd.to_numeric(df[selected_column], errors='coerce')
        valid_data = df[df[selected_column].notna()].copy()
        if valid_data.empty:
            return None
        # 基本统计信息
        total_count = len(valid_data)
        up_count = len(valid_data[valid_data[selected_column] > 0])
        down_count = len(valid_data[valid_data[selected_column] < 0])
        flat_count = len(valid_data[valid_data[selected_column] == 0])
        up_ratio = (up_count / total_count) * 100 if total_count > 0 else 0
        down_ratio = (down_count / total_count) * 100 if total_count > 0 else 0
        flat_ratio = (flat_count / total_count) * 100 if total_count > 0 else 0
        max_increase = valid_data[selected_column].max()
        min_increase = valid_data[selected_column].min()
        avg_increase = valid_data[selected_column].mean()
        max_stock = valid_data.loc[valid_data[selected_column].idxmax()]
        min_stock = valid_data.loc[valid_data[selected_column].idxmin()]
        # 涨停跌停统计
        limit_up = valid_data[valid_data[selected_column] >= 9.9].copy()
        limit_down = valid_data[valid_data[selected_column] <= -9.9].copy()
        # 提取概念信息
        def extract_concepts(concept_str):
            if pd.isna(concept_str):
                return []
            return [c.strip() for c in re.split(r'[,;,;]', concept_str) if c.strip()]
        # 涨停概念统计
        limit_up_concepts = []
        for _, row in limit_up.iterrows():
            concepts = extract_concepts(row.get('所属概念', ''))
            for concept in concepts:
                limit_up_concepts.append({
                    '股票代码': row.get('股票代码', ''),
                    '股票简称': row.get('股票简称', ''),
                    '涨跌幅': row[selected_column],
                    '概念': concept
                })
        # 跌停概念统计
        limit_down_concepts = []
        for _, row in limit_down.iterrows():
            concepts = extract_concepts(row.get('所属概念', ''))
            for concept in concepts:
                limit_down_concepts.append({
                    '股票代码': row.get('股票代码', ''),
                    '股票简称': row.get('股票简称', ''),
                    '涨跌幅': row[selected_column],
                    '概念': concept
                })
        # 按涨跌幅区间统计
        bins = [-float('inf'), -5, -3, -1, 0, 1, 3, 5, float('inf')]
        labels = ['<-5%', '-5%至-3%', '-3%至-1%', '0%', '0%至1%', '1%至3%', '3%至5%', '>5%']
        range_stats = pd.cut(valid_data[selected_column], bins=bins, labels=labels).value_counts().sort_index()
        # 返回统计结果
        stats = {
            '统计日期': target_date,
            '统计时间': target_time,
            '总家数': total_count,
            '上涨家数': up_count,
            '下跌家数': down_count,
            '平盘家数': flat_count,
            '上涨比例': up_ratio,
            '下跌比例': down_ratio,
            '平盘比例': flat_ratio,
            '最大涨幅': max_increase,
            '最大跌幅': min_increase,
            '平均涨跌幅': avg_increase,
            '涨跌幅列名': selected_column,
            '涨停数量': len(limit_up),
            '跌停数量': len(limit_down),
            '涨停股票': limit_up[['股票代码', '股票简称', selected_column, '所属概念']].copy(),
            '跌停股票': limit_down[['股票代码', '股票简称', selected_column, '所属概念']].copy(),
            '涨停概念': pd.DataFrame(limit_up_concepts),
            '跌停概念': pd.DataFrame(limit_down_concepts),
            '区间分布': range_stats,
            '详细数据': valid_data[['股票代码', '股票简称', selected_column, '所属概念']].copy()
        }
        return stats
    except Exception as e:
        st.error(f"统计过程中发生错误: {e}")
        return None
# Streamlit 应用
st.set_page_config(
    page_title="大盘涨跌统计",
    page_icon="📈",
    layout="wide",
    initial_sidebar_state="expanded"
)
# 左侧边栏 - 查询控制
with st.sidebar:
    st.title("🔍 查询控制")
    st.markdown("---")
    # 日期选择
    st.subheader("📅 选择日期")
    selected_date = st.date_input(
        "统计日期",
        datetime.now().date(),
        help="选择要统计的日期"
    )
    # 时间选择
    st.subheader("🕐 选择时间")
    # 预设时间选项
    time_options = {
        "开盘前集合竞价 (09:25)": "09:25",
        "开盘 (09:30)": "09:30",
        "早盘收盘 (11:30)": "11:30",
        "午盘开盘 (13:00)": "13:00",
        "收盘前 (14:57)": "14:57",
        "收盘 (15:00)": "15:00",
        "自定义时间": None
    }
    selected_option = st.selectbox(
        "选择预设时间",
        list(time_options.keys()),
        help="选择常用时间点或自定义时间"
    )
    # 根据选择设置时间
    if selected_option == "自定义时间":
        selected_time = st.time_input(
            "自定义时间",
            value=time(9, 25),
            help="选择具体的时分"
        )
        target_time = selected_time.strftime("%H:%M")
    else:
        target_time = time_options[selected_option]
        st.info(f"已选择时间: {target_time}")
    # 查询按钮
    st.markdown("---")
    if st.button("🚀 开始统计", type="primary", use_container_width=True):
        target_date = selected_date.strftime('%Y%m%d')
        with st.spinner(f"正在获取 {target_date} {target_time} 的数据..."):
            result = get_market_change_data(target_date, target_time)
        if result is None:
            st.session_state.stats_result = None
            st.error("未获取到数据,请检查日期和时间是否有效")
        else:
            st.session_state.stats_result = result
            st.success(f"✅ 成功获取数据")
            st.balloons()
    # 显示当前查询状态
    st.markdown("---")
    st.subheader("📊 查询状态")
    if 'stats_result' in st.session_state and st.session_state.stats_result:
        result = st.session_state.stats_result
        st.info(f"已加载 {result['统计日期']} {result['统计时间']} 数据")
        st.metric("总家数", result['总家数'])
        st.metric("涨停数", result['涨停数量'])
        st.metric("跌停数", result['跌停数量'])
    else:
        st.warning("暂无数据,请先查询")
    # 查询提示
    st.markdown("---")
    st.markdown("""
    <div style='font-size: 12px; color: gray;'>
    <p><strong>提示:</strong></p>
    <ul>
    <li>交易时间:09:30-11:30, 13:00-15:00</li>
    <li>集合竞价:09:15-09:25</li>
    <li>建议选择整点或半点时间</li>
    </ul>
    </div>
    """, unsafe_allow_html=True)
# 主区域 - 内容展示
st.title("📈 大盘涨跌统计")
st.markdown("统计指定日期和时分的大盘涨跌情况,包括涨停跌停股票及其概念分布")
# 检查是否有数据
if 'stats_result' in st.session_state and st.session_state.stats_result:
    result = st.session_state.stats_result
    # 显示查询的日期时间
    st.markdown(f"### 📊 {result['统计日期']} {result['统计时间']} 市场统计")
    # 使用tabs组织内容
    tab1, tab2, tab3, tab4 = st.tabs([
        "📊 概览",
        "🚨 涨停跌停",
        "📈 概念分析",
        "📋 详细数据"
    ])
    with tab1:
        # 概览页面
        st.header("📊 市场概览")
        # 关键指标卡片
        col1, col2, col3, col4 = st.columns(4)
        with col1:
            st.metric("📈 上涨家数", f"{result['上涨家数']}", f"{result['上涨比例']:.2f}%")
        with col2:
            st.metric("📉 下跌家数", f"{result['下跌家数']}", f"{result['下跌比例']:.2f}%", delta_color="inverse")
        with col3:
            st.metric("🔴 涨停数", f"{result['涨停数量']}")
        with col4:
            st.metric("🟢 跌停数", f"{result['跌停数量']}")
        st.markdown("---")
        # 涨跌幅统计
        col1, col2, col3 = st.columns(3)
        with col1:
            st.metric("⬆️ 最大涨幅", f"{result['最大涨幅']:.2f}%")
        with col2:
            st.metric("⬇️ 最大跌幅", f"{result['最大跌幅']:.2f}%")
        with col3:
            st.metric("📊 平均涨跌", f"{result['平均涨跌幅']:.2f}%")
        # 区间分布图
        st.markdown("---")
        st.subheader("📊 涨跌幅区间分布")
        range_df = pd.DataFrame({
            '区间': result['区间分布'].index,
            '数量': result['区间分布'].values,
            '比例': (result['区间分布'].values / result['总家数'] * 100).round(2)
        })
        fig = go.Figure()
        fig.add_trace(go.Bar(
            name='数量',
            x=range_df['区间'],
            y=range_df['数量'],
            marker_color='lightblue'
        ))
        fig.add_trace(go.Scatter(
            name='比例(%)',
            x=range_df['区间'],
            y=range_df['比例'],
            mode='lines+markers',
            line=dict(color='firebrick', width=3),
            yaxis='y2'
        ))
        fig.update_layout(
            title="涨跌幅区间分布",
            xaxis_title="涨跌幅区间",
            yaxis=dict(title="数量", side="left"),
            yaxis2=dict(title="比例(%)", overlaying="y", side="right"),
            height=400
        )
        st.plotly_chart(fig, use_container_width=True)
    with tab2:
        # 涨停跌停页面
        st.header("🚨 涨停跌停分析")
        st.subheader(f"🔴 涨停股票 ({result['涨停数量']}家)")
        if not result['涨停股票'].empty:
            st.dataframe(
                result['涨停股票'],
                use_container_width=True,
                hide_index=True,
                height=500
            )
        else:
            st.info("该时间点无涨停股票")
        st.subheader(f"🟢 跌停股票 ({result['跌停数量']}家)")
        if not result['跌停股票'].empty:
            st.dataframe(
                result['跌停股票'],
                use_container_width=True,
                hide_index=True,
                height=500
            )
        else:
            st.info("该时间点无跌停股票")
    with tab3:
        # 概念分析页面
        st.header("📈 概念板块分析")
        if not result['涨停概念'].empty or not result['跌停概念'].empty:
            col1, col2 = st.columns(2)
            with col1:
                if not result['涨停概念'].empty:
                    st.subheader("🔴 涨停概念TOP15")
                    concept_counts = result['涨停概念']['概念'].value_counts().head(15)
                    fig = px.bar(
                        x=concept_counts.values,
                        y=concept_counts.index,
                        orientation='h',
                        labels={'x': '涨停数量', 'y': '概念'},
                        title="涨停概念分布"
                    )
                    fig.update_layout(height=600, yaxis={'categoryorder': 'total ascending'})
                    st.plotly_chart(fig, use_container_width=True)
                    with st.expander("查看详情"):
                        st.dataframe(result['涨停概念'], hide_index=True)
            with col2:
                if not result['跌停概念'].empty:
                    st.subheader("🟢 跌停概念TOP15")
                    concept_counts = result['跌停概念']['概念'].value_counts().head(15)
                    fig = px.bar(
                        x=concept_counts.values,
                        y=concept_counts.index,
                        orientation='h',
                        labels={'x': '跌停数量', 'y': '概念'},
                        title="跌停概念分布"
                    )
                    fig.update_layout(height=600, yaxis={'categoryorder': 'total ascending'})
                    st.plotly_chart(fig, use_container_width=True)
                    with st.expander("查看详情"):
                        st.dataframe(result['跌停概念'], hide_index=True)
        else:
            st.info("该时间点暂无涨停跌停概念数据")
    with tab4:
        # 详细数据页面
        st.header("📋 详细数据")
        # 数据概览
        col1, col2, col3, col4 = st.columns(4)
        with col1:
            st.metric("记录数", len(result['详细数据']))
        with col2:
            st.metric("统计日期", result['统计日期'])
        with col3:
            st.metric("统计时间", result['统计时间'])
        with col4:
            st.metric("数据列", result['涨跌幅列名'])
        # 数据表格
        st.markdown("---")
        st.subheader("完整数据列表")
        st.dataframe(
            result['详细数据'],
            use_container_width=True,
            hide_index=True,
            height=500
        )
        # 下载按钮
        csv = result['详细数据'].to_csv(index=False).encode('utf-8-sig')
        st.download_button(
            label="📥 下载CSV文件",
            data=csv,
            file_name=f"{result['统计日期']}_{result['统计时间'].replace(':', '')}_market_data.csv",
            mime="text/csv",
            use_container_width=True
        )
else:
    # 无数据时的提示
    st.markdown("---")
    st.info("👈 请在左侧选择日期和时间并点击'开始统计'按钮查询数据")

如果我的分享对你投资有所帮助,不吝啬给个点赞关注呗。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 这里贴一下完整代码,参考下思路, 具体根据自己的实际情况改造。 备注:如果发现格式有多余的特殊字符,用普通浏览器打开复制应该没问题。 希望我的分享对大家有所帮助
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档