
在大数据时代,无限流数据已成为企业和机构的核心数据资产,如实时产生的舆情信息、客服工单、系统日志,但这类数据具有无界性、实时性、非结构化三大特征,传统的批量聚类算法(如 K-Means)和人工标注模式已无法满足“边产生、边处理、边应用”的需求。而增量聚类算法与大模型的结合,恰好能解决这一棘手的难题,增量聚类实现数据流的动态聚类,大模型完成聚类结果的实时智能打标,让大模型从处理静态数据走向驾驭无限流数据。
今天我们就由浅入深拆解“增量聚类 + 大模型”的核心逻辑,了解其概念和原理、业务的执行流程、详细的示例实现,深入的理解这一技术组合的实践逻辑。

无限流数据指的是持续产生、无固定边界、无法一次性加载到内存或磁盘中处理的数据,典型代表包括:
这类数据有三个核心特征,也是传统处理方式的痛点:
特征 | 具体表现 | 传统批量处理的问题 |
|---|---|---|
实时性 | 数据产生后需秒级、分钟级处理,否则失去价值(如舆情热点需及时响应) | 批量算法需等待数据积累,无法实时输出结果 |
无界性 | 数据持续产生,总量无限,无法一次性加载到内存进行聚类、分析 | 批量聚类(如 K-Means)需全量数据,内存撑爆 |
概念漂移 | 数据的分布、含义会随时间变化(如工单主题从 “退款” 变为 “物流”) | 批量模型训练后固定,无法适配数据分布变化 |
传统方案的两个核心问题:
而“增量聚类 + 大模型”的组合恰好解决这两个问题:
增量聚类是一类“边接收数据、边更新聚类结构”的算法,核心目标是:在不重新计算全量数据的前提下,将新数据融入已有的聚类结果中。对比传统批量聚类,两者的核心差异如下:
2.1.1 批量聚类(如 K-Means)
2.1.2 增量聚类(如增量 K-Means、StreamKM++)
批量聚类 vs 增量聚类内存占用和处理时间的性能对比:

概念漂移场景下,批量聚类 vs 增量聚类的准确性对比:

我们用整理书架的例子理解增量聚类:
增量聚类的核心操作只有三个:
3.1 大模型不是聚类器,而是语义解析器
基于大模型的强大能力,我们考虑:“大模型能不能直接做增量聚类?” 答案是:不适合。
原因很简单:大模型的核心优势是“语义理解、自然语言生成”,而非“数值型聚类计算”。增量聚类的核心是“高效的距离计算、聚类中心更新”,这是传统算法的强项;而聚类后的“类别是什么意思”,是大模型的强项。
大模型在整个流程中的核心作用分为三类:

3.2 大模型处理无限流数据的关键:轻量化与实时性
大模型通常处理单条或批量文本,怎么处理无限流数据,核心是两个优化:
简单来说:大模型处理的是“聚类后的聚合数据”,而非“原始无限流数据”,这是大模型能处理无限流数据的核心逻辑。
要理解增量聚类,首先要掌握传统 K-Means 的核心步骤,因为增量 K-Means 是最易理解的增量聚类算法:
K-Means 的核心问题:每次迭代都需要全量数据,如果数据是持续流入的,如每秒100条工单,无法每次都重新计算全量数据的聚类中心,这就是增量 K-Means 要解决的问题。
完整的细节可以参考往期文章《构建AI智能体:三十九、中文新闻智能分类:K-Means聚类与Qwen主题生成的融合应用》;
增量 K-Means 是传统 K-Means 的增量版本,核心思想是:不保存全量数据,只保存每个聚类的“统计信息”,包括聚类中心、数据点数量、总和,新数据到来时仅更新这些统计信息。 我们用公式和通俗解释结合的方式讲解:
2.1 核心统计信息
对每个聚类Ci,保存三个核心信息:
2.2 新数据到来时的更新逻辑
假设新数据点为x(向量):
2.3 通俗解释
还是用整理书架的例子:
2.4 增量 K-Means 的优势与局限
优势:
局限:
针对增量 K-Means 的局限,StreamKM++ 是专门为数据流设计的增量聚类算法,核心优化是:
StreamKM++ 的核心流程:

StreamKM++ 是实际应用处理流数据的常用算法,比增量 K-Means 更实用,后续我们也会更深入的详细讲解。
我们通过完整的执行流程,了解流程的核心特性:

流程说明:
- 1. 需求定义,明确业务目标:
- 2. 数据格式梳理,工单数据的典型格式(JSON):
{
"ticket_id": "10001",
"content": "我的快递丢了,已经3天了,能理赔吗?",
"create_time": "2026-02-25 10:00:01",
"user_id": "u89765"
}- 3. 数据预处理规则
from sentence_transformers import SentenceTransformer
from modelscope import snapshot_download
cache_dir = "D:\\modelscope\\hub"
model_dir = snapshot_download(
model_id="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
cache_dir=cache_dir,
revision="master" # 或指定分支/commit
)
# 1. 加载Embedding模型(轻量级Sentence-BERT)
model = SentenceTransformer("D:\\modelscope\\hub\\sentence-transformers\\paraphrase-multilingual-MiniLM-L12-v2")
def get_local_embedding(text):
"""本地生成Embedding向量"""
# 返回384维向量
embedding = model.encode(text, normalize_embeddings=True)
return embedding
# 测试
text = "我的快递丢了,能理赔吗?"
embedding = get_local_embedding(text)
print(f"Embedding向量长度:{len(embedding)}") 输出结果:
Embedding向量长度:384
我们选择 StreamKM++ 的简化版,MiniBatchKMeans,scikit-learn 内置,简单易实现
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import pairwise_distances_argmin_min
class IncrementalClustering:
def __init__(self, n_clusters=5, batch_size=100):
"""
初始化增量聚类器
:param n_clusters: 初始聚类数量
:param batch_size: 微批次大小
"""
self.n_clusters = n_clusters
self.batch_size = batch_size
# 初始化MiniBatchKMeans(增量K-Means)
self.cluster_model = MiniBatchKMeans(
n_clusters=n_clusters,
batch_size=batch_size,
random_state=42,
init_size=1000 # 初始聚类中心数量
)
# 保存每个聚类的文本示例(用于后续打标)
self.cluster_texts = {i: [] for i in range(n_clusters)}
# 保存聚类中心
self.cluster_centers = None
def update_cluster(self, embeddings, texts):
"""
增量更新聚类
:param embeddings: 新数据的Embedding向量列表(np.array)
:param texts: 对应的文本列表
:return: 聚类结果(每个数据的聚类标签)
"""
# 训练模型(增量更新)
labels = self.cluster_model.partial_fit(embeddings).predict(embeddings)
# 更新聚类中心
self.cluster_centers = self.cluster_model.cluster_centers_
# 保存每个聚类的文本示例(最多保存20条,避免内存占用)
for label, text in zip(labels, texts):
if len(self.cluster_texts[label]) < 20:
self.cluster_texts[label].append(text)
return labels
def get_cluster_examples(self):
"""获取每个聚类的文本示例"""
return self.cluster_texts
def get_nearest_text(self, center, texts):
"""获取离聚类中心最近的文本(最具代表性)"""
if len(texts) == 0:
return ""
# 计算距离
distances = pairwise_distances_argmin_min([center], texts)[1][0]
# 找到最近的文本
nearest_idx = np.argmin(distances)
return texts[nearest_idx]代码解释:
我们基于混元大模型实现聚类打标,Prompt 设计是核心:
def cluster_labeling(cluster_examples):
"""
对聚类结果进行打标(使用腾讯混元大模型)
:param cluster_examples: 每个聚类的文本示例(dict,{聚类标签: [文本1, 文本2,...]})
:return: 聚类标签映射({聚类标签: 打标结果})
"""
from openai import OpenAI
# 配置混元API
api_key = 'sk-bWlJPKjBrSFGoQ0Ys***********************'
client = OpenAI(
api_key=api_key,
base_url="https://api.hunyuan.cloud.tencent.com/v1",
)
label_mapping = {}
for cluster_id, examples in cluster_examples.items():
if len(examples) == 0:
label_mapping[cluster_id] = "未知主题"
continue
# 构建Prompt
prompt = f"""
你是一个专业的客服工单分类分析师,需要根据以下文本示例总结聚类的核心内容:
文本示例:
{chr(10).join([f"- {text}" for text in examples[:10]])}
总结要求:
1. 用不超过20个字的标题概括聚类核心内容;
2. 标题要简洁、准确,符合客服工单场景;
3. 避免模糊表述,如"其他问题""未知主题";
4. 核心要突出用户诉求,如"快递丢失理赔咨询""退款申请处理"。
"""
try:
# 调用混元大模型
completion = client.chat.completions.create(
model="hunyuan-lite",
messages=[
{'role': 'system', 'content': '你是一个专业的客服工单分类分析师'},
{'role': 'user', 'content': prompt}
],
temperature=0.1,
max_tokens=50
)
# 提取打标结果
label = completion.choices[0].message.content.strip()
label_mapping[cluster_id] = label
except Exception as e:
print(f"聚类{cluster_id}打标失败:{e}")
label_mapping[cluster_id] = f"未识别-{cluster_id}"
return label_mapping代码解释:
import time
import json
# 1. 模拟流数据生成(初学者测试用)
def generate_simulated_tickets():
"""生成模拟工单数据"""
ticket_templates = [
"我的快递丢了,能理赔吗?",
"快递延迟了3天,要求赔偿",
"退款申请提交后,商家不处理",
"商品质量有问题,要求退货",
"物流信息不更新,怎么办?"
]
while True:
# 随机选择模板生成工单
content = np.random.choice(ticket_templates)
ticket = {
"ticket_id": f"sim_{int(time.time())}",
"content": content,
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"user_id": f"u_{np.random.randint(1000, 9999)}"
}
yield ticket
# 模拟每秒生成1条工单
time.sleep(1)
# 2. 流数据处理主逻辑
def stream_processing():
"""实时流数据处理"""
# 初始化增量聚类器
clusterer = IncrementalClustering(n_clusters=5, batch_size=10)
# 缓存批次数据(每10条处理一次)
batch_embeddings = []
batch_texts = []
batch_size = 10
# 生成模拟数据
ticket_generator = generate_simulated_tickets()
for ticket in ticket_generator:
text = ticket["content"]
# 清洗文本
text = text.strip().replace("!", "").replace("?", "")
if not text:
continue
# 获取Embedding
embedding = get_local_embedding(text)
if embedding is None:
continue
# 添加到批次
batch_embeddings.append(embedding)
batch_texts.append(text)
# 批次满了,更新聚类
if len(batch_embeddings) >= batch_size:
# 转换为numpy数组
embeddings_np = np.array(batch_embeddings)
# 增量更新聚类
labels = clusterer.update_cluster(embeddings_np, batch_texts)
# 获取聚类示例,打标
cluster_examples = clusterer.get_cluster_examples()
label_mapping = cluster_labeling(cluster_examples)
# 输出结果
print(f"\n=== 批次处理结果({time.strftime('%Y-%m-%d %H:%M:%S')})===")
for i, (text, label) in enumerate(zip(batch_texts, labels)):
print(f"工单{i+1}:{text} → 聚类{label} → 标签:{label_mapping[label]}")
# 清空批次
batch_embeddings = []
batch_texts = []
# 启动处理
if __name__ == "__main__":
stream_processing()代码解释:
输出结果:
=== 批次处理结果(2026-02-25 22:45:45)=== 工单1:商品质量有问题,要求退货 → 聚类1 → 标签:商品质量问题,要求退换货 工单2:退款申请提交后,商家不处理 → 聚类3 → 标签:退款申请商家未处理 工单3:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 工单4:物流信息不更新,怎么办 → 聚类0 → 标签:物流信息更新问题 工单5:快递延迟了3天,要求赔偿 → 聚类4 → 标签:快递延误赔偿 工单6:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 工单7:退款申请提交后,商家不处理 → 聚类3 → 标签:退款申请商家未处理 工单8:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 工单9:商品质量有问题,要求退货 → 聚类1 → 标签:商品质量问题,要求退换货 工单10:商品质量有问题,要求退货 → 聚类1 → 标签:商品质量问题,要求退换货 === 批次处理结果(2026-02-25 22:46:01)=== 工单1:快递延迟了3天,要求赔偿 → 聚类4 → 标签:快递延误赔偿 工单2:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 工单3:物流信息不更新,怎么办 → 聚类0 → 标签:物流信息更新问题 工单4:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 工单5:退款申请提交后,商家不处理 → 聚类3 → 标签:退款申请超时未处理 工单6:快递延迟了3天,要求赔偿 → 聚类4 → 标签:快递延误赔偿 工单7:快递延迟了3天,要求赔偿 → 聚类4 → 标签:快递延误赔偿 工单8:退款申请提交后,商家不处理 → 聚类3 → 标签:退款申请超时未处理 工单9:退款申请提交后,商家不处理 → 聚类3 → 标签:退款申请超时未处理 工单10:我的快递丢了,能理赔吗 → 聚类2 → 标签:快递丢失理赔咨询 ........
聚类和打标结果需要输出到业务系统,如工单管理平台,可以通过 Flask 搭建简易 API:
from flask import Flask, jsonify
app = Flask(__name__)
# 全局变量保存最新的聚类标签映射
latest_label_mapping = {}
@app.route("/api/cluster_labels", methods=["GET"])
def get_cluster_labels():
"""获取最新的聚类标签"""
return jsonify({
"status": "success",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"data": latest_label_mapping
})
# 修改stream_processing中的打标部分,更新全局变量
def stream_processing():
global latest_label_mapping
# ... 原有逻辑 ...
label_mapping = cluster_labeling(cluster_examples)
# 更新全局变量
latest_label_mapping = label_mapping
# ... 原有逻辑 ...
# 启动Flask服务
if __name__ == "__main__":
# 多线程启动:一个线程处理流数据,一个线程启动Flask服务
import threading
t = threading.Thread(target=stream_processing)
t.start()
app.run(host="0.0.0.0", port=5000)访问http://localhost:5000/api/cluster_labels即可获取最新的聚类标签结果,业务系统可通过 API 调用集成。

通过对增量聚类结合大模型应用的完整了解,最大的感悟就是技术融合才是破局关键。以前单纯的认为大模型只能处理静态文本,增量聚类只是单纯的算法工具,没想到两者结合,居然能解决无限流数据这个难题,让大模型从离线工具变成了实时引擎。我们不用一开始就追求复杂的算法,先把核心逻辑吃透,增量聚类负责 动态分组,大模型负责语义翻译,一步步来反而更稳。建议先从模拟数据入手,把代码跑通,再替换成真实数据,慢慢优化参数和 Prompt。
在实际项目落地时不用盲目追求高精尖,优先用轻量化模型和批量处理,平衡好实时性和成本才是关键。技术的价值终究是落地到业务,把这个组合用在舆情、工单这些场景里,真正解决实际问题,才是学习的意义。
from sentence_transformers import SentenceTransformer
from modelscope import snapshot_download
cache_dir = "D:\\modelscope\\hub"
model_dir = snapshot_download(
model_id="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
cache_dir=cache_dir,
revision="master" # 或指定分支/commit
)
# 1. 加载Embedding模型(轻量级Sentence-BERT)
model = SentenceTransformer("D:\\modelscope\\hub\\sentence-transformers\\paraphrase-multilingual-MiniLM-L12-v2")
def get_local_embedding(text):
"""本地生成Embedding向量"""
# 返回384维向量
embedding = model.encode(text, normalize_embeddings=True)
return embedding
# 测试
text = "我的快递丢了,能理赔吗?"
embedding = get_local_embedding(text)
print(f"Embedding向量长度:{len(embedding)}")
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import pairwise_distances_argmin_min
class IncrementalClustering:
def __init__(self, n_clusters=5, batch_size=100):
"""
初始化增量聚类器
:param n_clusters: 初始聚类数量
:param batch_size: 微批次大小
"""
self.n_clusters = n_clusters
self.batch_size = batch_size
# 初始化MiniBatchKMeans(增量K-Means)
self.cluster_model = MiniBatchKMeans(
n_clusters=n_clusters,
batch_size=batch_size,
random_state=42,
init_size=1000 # 初始聚类中心数量
)
# 保存每个聚类的文本示例(用于后续打标)
self.cluster_texts = {i: [] for i in range(n_clusters)}
# 保存聚类中心
self.cluster_centers = None
def update_cluster(self, embeddings, texts):
"""
增量更新聚类
:param embeddings: 新数据的Embedding向量列表(np.array)
:param texts: 对应的文本列表
:return: 聚类结果(每个数据的聚类标签)
"""
# 训练模型(增量更新)
labels = self.cluster_model.partial_fit(embeddings).predict(embeddings)
# 更新聚类中心
self.cluster_centers = self.cluster_model.cluster_centers_
# 保存每个聚类的文本示例(最多保存20条,避免内存占用)
for label, text in zip(labels, texts):
if len(self.cluster_texts[label]) < 20:
self.cluster_texts[label].append(text)
return labels
def get_cluster_examples(self):
"""获取每个聚类的文本示例"""
return self.cluster_texts
def get_nearest_text(self, center, texts):
"""获取离聚类中心最近的文本(最具代表性)"""
if len(texts) == 0:
return ""
# 计算距离
distances = pairwise_distances_argmin_min([center], texts)[1][0]
# 找到最近的文本
nearest_idx = np.argmin(distances)
return texts[nearest_idx]
def cluster_labeling(cluster_examples):
"""
对聚类结果进行打标(使用腾讯混元大模型)
:param cluster_examples: 每个聚类的文本示例(dict,{聚类标签: [文本1, 文本2,...]})
:return: 聚类标签映射({聚类标签: 打标结果})
"""
from openai import OpenAI
# 配置混元API
api_key = 'sk-bW**********************5NP8Ze'
client = OpenAI(
api_key=api_key,
base_url="https://api.hunyuan.cloud.tencent.com/v1",
)
label_mapping = {}
for cluster_id, examples in cluster_examples.items():
if len(examples) == 0:
label_mapping[cluster_id] = "未知主题"
continue
# 构建Prompt
prompt = f"""
你是一个专业的客服工单分类分析师,需要根据以下文本示例总结聚类的核心内容:
文本示例:
{chr(10).join([f"- {text}" for text in examples[:10]])}
总结要求:
1. 用不超过20个字的标题概括聚类核心内容;
2. 标题要简洁、准确,符合客服工单场景;
3. 避免模糊表述,如"其他问题""未知主题";
4. 核心要突出用户诉求,如"快递丢失理赔咨询""退款申请处理"。
"""
try:
# 调用混元大模型
completion = client.chat.completions.create(
model="hunyuan-lite",
messages=[
{'role': 'system', 'content': '你是一个专业的客服工单分类分析师'},
{'role': 'user', 'content': prompt}
],
temperature=0.1,
max_tokens=50
)
# 提取打标结果
label = completion.choices[0].message.content.strip()
label_mapping[cluster_id] = label
except Exception as e:
print(f"聚类{cluster_id}打标失败:{e}")
label_mapping[cluster_id] = f"未识别-{cluster_id}"
return label_mapping
import time
import json
# 1. 模拟流数据生成(初学者测试用)
def generate_simulated_tickets():
"""生成模拟工单数据"""
ticket_templates = [
"我的快递丢了,能理赔吗?",
"快递延迟了3天,要求赔偿",
"退款申请提交后,商家不处理",
"商品质量有问题,要求退货",
"物流信息不更新,怎么办?"
]
while True:
# 随机选择模板生成工单
content = np.random.choice(ticket_templates)
ticket = {
"ticket_id": f"sim_{int(time.time())}",
"content": content,
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"user_id": f"u_{np.random.randint(1000, 9999)}"
}
yield ticket
# 模拟每秒生成1条工单
time.sleep(1)
# 2. 流数据处理主逻辑
def stream_processing():
"""实时流数据处理"""
# 初始化增量聚类器
clusterer = IncrementalClustering(n_clusters=5, batch_size=10)
# 缓存批次数据(每10条处理一次)
batch_embeddings = []
batch_texts = []
batch_size = 10
# 生成模拟数据
ticket_generator = generate_simulated_tickets()
for ticket in ticket_generator:
text = ticket["content"]
# 清洗文本
text = text.strip().replace("!", "").replace("?", "")
if not text:
continue
# 获取Embedding
embedding = get_local_embedding(text)
if embedding is None:
continue
# 添加到批次
batch_embeddings.append(embedding)
batch_texts.append(text)
# 批次满了,更新聚类
if len(batch_embeddings) >= batch_size:
# 转换为numpy数组
embeddings_np = np.array(batch_embeddings)
# 增量更新聚类
labels = clusterer.update_cluster(embeddings_np, batch_texts)
# 获取聚类示例,打标
cluster_examples = clusterer.get_cluster_examples()
label_mapping = cluster_labeling(cluster_examples)
# 输出结果
print(f"\n=== 批次处理结果({time.strftime('%Y-%m-%d %H:%M:%S')})===")
for i, (text, label) in enumerate(zip(batch_texts, labels)):
print(f"工单{i+1}:{text} → 聚类{label} → 标签:{label_mapping[label]}")
# 清空批次
batch_embeddings = []
batch_texts = []
# 启动处理
if __name__ == "__main__":
stream_processing()原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。