在大语言模型(LLM)的实际应用中,我们很少只处理单条文本。无论是数据分析、内容生成还是模型训练,都需要面对海量文本数据的处理需求。批量处理技术是连接LLM与实际应用场景的关键桥梁,它能够显著提升处理效率、降低计算成本,并实现更复杂的数据流水线设计。
随着2025年LLM技术的快速发展,批量处理技术也在不断演进。现代LLM框架提供了丰富的批量处理工具和优化策略,使得即使在有限资源条件下,也能高效处理大规模文本数据集。本文将全面介绍LLM批量处理的核心概念、技术实现和最佳实践,帮助读者掌握从数据准备到模型推理的完整批量处理流程。
在本文中,我们将学习:
批量处理(Batch Processing)是指将多个数据样本组合在一起进行统一处理的技术。在LLM应用中,批量处理通常涉及将多条文本组合成一个批次(batch),然后一次性输入到模型中进行处理。
单条处理:样本1 → 模型 → 结果1
样本2 → 模型 → 结果2
样本3 → 模型 → 结果3
批量处理:[样本1, 样本2, 样本3] → 模型 → [结果1, 结果2, 结果3]即使只有一个样本,在深度学习模型处理时也需要将其包装为包含一个样本的batch。这是因为现代深度学习框架和硬件都针对批量计算进行了优化。
批量处理在LLM应用中具有多方面的优势:
在开始批量处理实践之前,我们需要了解一些核心概念:
为了进行LLM批量处理,我们需要确保Python环境配置正确。推荐使用Python 3.10或更高版本,以获得最佳性能和兼容性。
# 检查Python版本
import sys
print(f"Python版本: {sys.version}")我们需要安装以下核心库来支持LLM批量处理:
# 安装基础库
pip install torch transformers datasets accelerate
# 安装数据处理库
pip install pandas numpy tqdm
# 安装并行处理库
pip install dask joblib截至2025年,这些库的最新版本都针对批量处理进行了显著优化:
安装完成后,我们可以通过以下代码验证所有库是否正确安装:
import torch
import transformers
import datasets
import accelerate
import pandas as pd
import numpy as np
import tqdm
import dask
print(f"Torch版本: {torch.__version__}")
print(f"Transformers版本: {transformers.__version__}")
print(f"Datasets版本: {datasets.__version__}")
print(f"Accelerate版本: {accelerate.__version__}")
print("所有库安装成功!")在实际应用中,我们经常需要处理自己的文本数据。下面介绍如何创建和加载自定义数据集:
from datasets import Dataset
import pandas as pd
# 创建示例数据
data = {
"text": [
"这是第一个文本样本,用于批量处理测试。",
"这是第二个较长的文本样本,包含更多的内容和信息。",
"这是第三个文本样本,测试批量处理的效果。",
"这是第四个文本样本,用于演示如何处理不同长度的文本。",
"这是第五个文本样本,批量处理能够显著提高效率。"
],
"label": [0, 1, 0, 1, 0]
}
# 转换为pandas DataFrame
df = pd.DataFrame(data)
# 创建Hugging Face Dataset
dataset = Dataset.from_pandas(df)
print(f"数据集包含 {len(dataset)} 个样本")
print("数据集示例:")
print(dataset[0])对于大型数据集,我们可以使用Datasets库的流式处理功能,避免一次性加载全部数据到内存:
from datasets import load_dataset
# 流式加载大型数据集
dataset = load_dataset(
"text",
data_files={"train": "large_text_file.txt"},
streaming=True # 启用流式处理
)
# 查看前5个样本
print("前5个样本:")
for i, example in enumerate(dataset["train"].take(5)):
print(f"样本 {i+1}: {example['text'][:100]}...")在批量处理前,我们通常需要对数据进行预处理。以下是一个完整的预处理流程示例:
from datasets import Dataset
from transformers import AutoTokenizer
import pandas as pd
import re
# 示例文本数据
data = {
"text": [
"Hello world! This is a test. 123@#$",
"ANOTHER EXAMPLE with Mixed CASE and extra spaces.",
"This text contains\nline breaks and\ttabs."
]
}
df = pd.DataFrame(data)
dataset = Dataset.from_pandas(df)
# 定义预处理函数
def preprocess_function(example):
# 转换为小写
text = example["text"].lower()
# 移除特殊字符
text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
# 移除多余的空格
text = re.sub(r"\s+", " ", text)
# 移除行首行尾空格
text = text.strip()
return {"text": text}
# 应用预处理函数
dataset = dataset.map(preprocess_function)
print("预处理后的数据集:")
for i, example in enumerate(dataset):
print(f"样本 {i+1}: {example['text']}")DataLoader是PyTorch中用于批量加载数据的核心组件,它能够自动处理批量采样、打乱数据和并行加载等功能:
from torch.utils.data import DataLoader, TensorDataset
import torch
# 创建示例数据
texts = ["文本1", "文本2", "文本3", "文本4", "文本5", "文本6"]
# 假设我们已经将文本转换为token IDs
token_ids = torch.tensor([
[101, 2003, 102],
[101, 2004, 102],
[101, 2005, 102],
[101, 2006, 102],
[101, 2007, 102],
[101, 2008, 102]
])
# 创建数据集
dataset = TensorDataset(token_ids)
# 创建DataLoader
dataloader = DataLoader(
dataset,
batch_size=2, # 批次大小
shuffle=True, # 打乱数据
num_workers=2 # 并行加载的线程数
)
# 遍历DataLoader
print("批量加载的数据:")
for batch in dataloader:
print(batch[0])Hugging Face提供了与DataLoader集成的便捷方式:
from transformers import AutoTokenizer
from torch.utils.data import DataLoader
from datasets import Dataset
# 创建示例数据集
data = {"text": ["文本1", "文本2", "文本3", "文本4", "文本5", "文本6"]}
dataset = Dataset.from_dict(data)
# 加载分词器
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
# 分词函数
def tokenize_function(examples):
return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)
# 应用分词器
tokenized_dataset = dataset.map(tokenize_function, batched=True)
# 转换为PyTorch格式
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask"])
# 创建DataLoader
dataloader = DataLoader(
tokenized_dataset,
batch_size=2,
shuffle=True
)
# 遍历DataLoader
print("分词后的批量数据:")
for batch in dataloader:
print("Input IDs:", batch["input_ids"])
print("Attention Mask:", batch["attention_mask"])
print("---")对于更复杂的数据处理需求,我们可以自定义collate函数:
from torch.utils.data import DataLoader
from transformers import AutoTokenizer
# 自定义collate函数
def custom_collate_fn(batch):
# 提取文本
texts = [item["text"] for item in batch]
# 分词处理
tokenized = tokenizer(
texts,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
# 返回处理后的批次
return tokenized
# 创建DataLoader
dataloader = DataLoader(
dataset,
batch_size=2,
collate_fn=custom_collate_fn
)
# 测试自定义collate函数
print("自定义collate函数处理的批量数据:")
for batch in dataloader:
print(batch)
print("---")以下是使用Transformers库进行基础批量推理的示例:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
# 示例文本
texts = [
"I love this product! It's amazing.",
"This is terrible. I would not recommend it.",
"The quality is okay, but could be better.",
"Absolutely fantastic! Will buy again.",
"Not what I expected. Disappointed."
]
# 批量编码
inputs = tokenizer(
texts,
padding=True,
truncation=True,
max_length=128,
return_tensors="pt"
)
# 批量推理
with torch.no_grad():
outputs = model(**inputs)
predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
# 解析结果
for i, text in enumerate(texts):
sentiment = "积极" if predictions[i][1] > predictions[i][0] else "消极"
confidence = predictions[i].max().item()
print(f"文本: {text}")
print(f"预测: {sentiment} (置信度: {confidence:.4f})")
print()Transformers的Pipeline接口提供了更简洁的批量推理方式:
from transformers import pipeline
# 创建情感分析pipeline
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
# 批量推理
results = sentiment_analyzer(texts)
# 打印结果
for text, result in zip(texts, results):
print(f"文本: {text}")
print(f"预测: {result['label']} (置信度: {result['score']:.4f})")
print()对于文本生成任务,批量处理需要特别注意输出长度的控制:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2")
# 确保使用正确的填充token
tokenizer.pad_token = tokenizer.eos_token
# 示例提示
prompts = [
"Once upon a time,",
"In a distant galaxy,",
"The future of AI is",
"How to be successful in"
]
# 批量编码
inputs = tokenizer(
prompts,
padding=True,
return_tensors="pt"
)
# 批量生成文本
outputs = model.generate(
**inputs,
max_new_tokens=50,
do_sample=True,
temperature=0.7,
pad_token_id=tokenizer.eos_token_id # 确保正确处理填充
)
# 解码并打印结果
for i, output in enumerate(outputs):
generated_text = tokenizer.decode(output, skip_special_tokens=True)
print(f"提示: {prompts[i]}")
print(f"生成: {generated_text}")
print()选择合适的批量大小是性能优化的关键。以下是确定最佳批量大小的方法:
def find_optimal_batch_size(model, tokenizer, sample_texts):
"""
通过递增测试找到最佳批量大小
"""
# 开始时使用较小的批量大小
batch_size = 1
max_batch_size = 128 # 设置最大尝试值
optimal_batch_size = 1
# 测试不同的批量大小
while batch_size <= max_batch_size:
try:
# 创建当前批量大小的输入
inputs = tokenizer(
sample_texts * batch_size, # 重复样本以达到所需批量大小
padding=True,
return_tensors="pt"
).to(next(model.parameters()).device)
# 尝试前向传播
with torch.no_grad():
outputs = model(**inputs)
# 如果成功,记录并增加批量大小
optimal_batch_size = batch_size
batch_size *= 2
print(f"成功处理批量大小: {optimal_batch_size}")
except RuntimeError as e:
# 内存不足错误
if "out of memory" in str(e):
print(f"内存不足,批量大小 {batch_size} 太大")
break
else:
# 其他运行时错误
raise e
return optimal_batch_size
# 使用示例
sample_text = "This is a sample text for batch size optimization."
sample_texts = [sample_text] * 4 # 4个样本作为基础
optimal_size = find_optimal_batch_size(model, tokenizer, sample_texts)
print(f"最佳批量大小: {optimal_size}")梯度检查点(Gradient Checkpointing)是一种在训练过程中减少内存使用的技术:
from transformers import AutoModelForSequenceClassification
# 加载模型并启用梯度检查点
model = AutoModelForSequenceClassification.from_pretrained(
"bert-base-uncased",
gradient_checkpointing=True # 启用梯度检查点
)
print("梯度检查点已启用,这将减少约30-40%的内存使用,但会略微增加计算时间")混合精度可以显著减少内存使用并提高计算效率:
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.cuda.amp import autocast
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained("gpt2")
model = AutoModelForCausalLM.from_pretrained("gpt2").cuda()
# 示例文本
texts = ["This is a test", "Another example"]
# 使用autocast进行混合精度推理
with autocast():
inputs = tokenizer(texts, padding=True, return_tensors="pt").to("cuda")
outputs = model.generate(**inputs, max_new_tokens=20)
# 解码结果
results = tokenizer.batch_decode(outputs, skip_special_tokens=True)
for result in results:
print(result)Python的multiprocessing模块可以实现简单的并行处理:
from multiprocessing import Pool
import time
def process_text(text):
"""模拟文本处理函数"""
time.sleep(1) # 模拟耗时操作
return f"处理结果: {text.upper()}"
# 示例文本列表
texts = [f"文本{i}" for i in range(10)]
# 单进程处理
print("单进程处理:")
start_time = time.time()
results_single = [process_text(text) for text in texts]
end_time = time.time()
print(f"单进程耗时: {end_time - start_time:.2f}秒")
# 多进程处理
print("\n多进程处理:")
start_time = time.time()
with Pool(processes=4) as pool:
results_multi = pool.map(process_text, texts)
end_time = time.time()
print(f"多进程耗时: {end_time - start_time:.2f}秒")
print(f"加速比: {(end_time_start_single) / (end_time - start_time):.2f}x")对于大规模数据,Dask提供了更强大的分布式处理能力:
import dask.dataframe as dd
import pandas as pd
# 创建示例数据
data = {"text": [f"文本{i}" for i in range(1000)]}
df = pd.DataFrame(data)
# 转换为Dask DataFrame
ddf = dd.from_pandas(df, npartitions=4)
# 定义处理函数
def process_row(row):
return row["text"].upper()
# 应用处理函数
ddf["processed"] = ddf.apply(process_row, axis=1, meta=('processed', 'object'))
# 计算并获取结果
result = ddf.compute()
print(f"处理完成,结果长度: {len(result)}")
print("前5条处理结果:")
print(result.head())Hugging Face的Accelerate库提供了更高级的分布式推理支持:
from accelerate import Accelerator
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# 初始化加速器
accelerator = Accelerator()
# 加载模型和分词器
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
# 使用加速器准备模型
model = accelerator.prepare(model)
# 示例文本
texts = ["I love this!", "This is terrible.", "It's okay."] * 100
# 批量处理
batch_size = 32
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
inputs = tokenizer(batch, padding=True, truncation=True, return_tensors="pt")
inputs = {k: v.to(accelerator.device) for k, v in inputs.items()}
with accelerator.no_sync():
outputs = model(**inputs)
predictions = accelerator.gather(outputs.logits)
results.extend(predictions.cpu().numpy())
print(f"分布式处理完成,共处理 {len(results)} 个样本")对于超大文件,我们可以使用流式处理来避免内存溢出:
def stream_process_large_file(file_path, process_func, batch_size=1000):
"""
流式处理大文件
参数:
file_path: 文件路径
process_func: 处理函数
batch_size: 批次大小
"""
batch = []
result_count = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line: # 跳过空行
batch.append(line)
# 当批次达到指定大小时进行处理
if len(batch) >= batch_size:
# 处理当前批次
process_func(batch)
result_count += len(batch)
print(f"已处理 {result_count} 行")
# 清空批次
batch = []
# 处理剩余的行
if batch:
process_func(batch)
result_count += len(batch)
return result_count
# 示例处理函数
def simple_processor(batch):
# 这里只是简单示例,实际应用中可以进行更复杂的处理
processed = [text.upper() for text in batch]
# 这里可以将处理结果写入输出文件
return processed
# 使用示例
# total_processed = stream_process_large_file("large_text_file.txt", simple_processor)
# print(f"总共处理 {total_processed} 行")对于非常大的数据集,可以使用内存映射文件来优化内存使用:
import numpy as np
def create_memory_mapped_dataset(data, filename, dtype=np.float32):
"""
创建内存映射数据集
"""
# 计算所需的总空间
total_size = data.nbytes
# 创建内存映射文件
mmapped_array = np.memmap(
filename,
dtype=dtype,
mode='w+',
shape=data.shape
)
# 写入数据
mmapped_array[:] = data[:]
mmapped_array.flush() # 确保数据写入磁盘
print(f"内存映射文件创建成功: {filename}")
print(f"数据形状: {data.shape}, 数据类型: {dtype}")
return mmapped_array
def load_memory_mapped_dataset(filename, shape, dtype=np.float32):
"""
加载内存映射数据集
"""
mmapped_array = np.memmap(
filename,
dtype=dtype,
mode='r',
shape=shape
)
print(f"内存映射文件加载成功: {filename}")
return mmapped_array
# 使用示例
# 创建随机数据
# data = np.random.rand(1000000, 768).astype(np.float32)
# 创建内存映射文件
# mmapped_array = create_memory_mapped_dataset(data, "embeddings.dat")
# 加载内存映射文件
# loaded_array = load_memory_mapped_dataset("embeddings.dat", (1000000, 768))对于长时间运行的处理任务,实现增量处理和检查点机制非常重要:
import os
import json
import pandas as pd
def process_with_checkpoints(input_file, output_file, checkpoint_file, process_func, batch_size=1000):
"""
带检查点的批量处理函数
"""
# 检查是否存在检查点
start_line = 0
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'r') as f:
checkpoint = json.load(f)
start_line = checkpoint.get('last_processed_line', 0)
print(f"检测到检查点,从第 {start_line} 行开始处理")
# 读取输入文件的总行数
with open(input_file, 'r', encoding='utf-8') as f:
total_lines = sum(1 for _ in f)
print(f"总文件行数: {total_lines}")
# 打开输入和输出文件
processed_count = 0
with open(input_file, 'r', encoding='utf-8') as f_in, \
open(output_file, 'a', encoding='utf-8') as f_out:
# 跳过已经处理过的行
for _ in range(start_line):
next(f_in)
batch = []
current_line = start_line
for line in f_in:
line = line.strip()
if line:
batch.append(line)
current_line += 1
# 当批次达到指定大小或到达文件末尾时处理
if len(batch) >= batch_size:
# 处理批次
results = process_func(batch)
# 写入结果
for result in results:
f_out.write(f"{result}\n")
# 刷新输出文件
f_out.flush()
# 更新计数器
processed_count += len(batch)
# 保存检查点
checkpoint = {'last_processed_line': current_line}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f)
# 打印进度
progress = (current_line / total_lines) * 100
print(f"进度: {progress:.2f}%, 已处理: {processed_count} 行")
# 清空批次
batch = []
# 处理剩余的批次
if batch:
results = process_func(batch)
for result in results:
f_out.write(f"{result}\n")
processed_count += len(batch)
# 更新检查点
checkpoint = {'last_processed_line': current_line}
with open(checkpoint_file, 'w') as f:
json.dump(checkpoint, f)
print(f"处理完成! 共处理 {processed_count} 行")
return processed_count
# 使用示例
# def example_processor(batch):
# return [text.upper() for text in batch]
#
# process_with_checkpoints(
# input_file="large_input.txt",
# output_file="processed_output.txt",
# checkpoint_file="processing_checkpoint.json",
# process_func=example_processor,
# batch_size=1000
# )下面是一个完整的情感分析批量处理流水线示例:
import os
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from torch.utils.data import DataLoader
from datasets import Dataset, load_dataset
import torch
import tqdm
def build_sentiment_analysis_pipeline():
# 1. 加载数据集
print("步骤1: 加载数据集")
if not os.path.exists("reviews.csv"):
# 创建示例数据
data = {
"review": [
"这款产品非常好用,我很满意!",
"质量一般,没有达到预期。",
"服务态度很好,但价格偏高。",
"绝对物超所值,强烈推荐!",
"体验很差,不会再购买了。"
] * 1000 # 创建1000条样本
}
df = pd.DataFrame(data)
df.to_csv("reviews.csv", index=False)
print(f"创建了示例数据集,共 {len(df)} 条评论")
else:
df = pd.read_csv("reviews.csv")
print(f"加载了现有数据集,共 {len(df)} 条评论")
# 转换为Dataset对象
dataset = Dataset.from_pandas(df)
# 2. 加载模型和分词器
print("\n步骤2: 加载模型和分词器")
model_name = "uer/roberta-base-finetuned-jd-binary-chinese"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 如果有GPU,移动模型到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
print(f"使用设备: {device}")
# 3. 预处理函数
def preprocess_function(examples):
return tokenizer(examples["review"], padding="max_length", truncation=True, max_length=128)
# 4. 处理数据集
print("\n步骤3: 预处理数据集")
tokenized_dataset = dataset.map(preprocess_function, batched=True)
tokenized_dataset.set_format("torch", columns=["input_ids", "attention_mask"])
# 5. 创建DataLoader
batch_size = 32
dataloader = DataLoader(tokenized_dataset, batch_size=batch_size)
# 6. 批量推理
print("\n步骤4: 批量推理")
all_predictions = []
all_scores = []
model.eval()
with torch.no_grad():
for batch in tqdm.tqdm(dataloader, desc="处理批次"):
# 移动批次到正确的设备
batch = {k: v.to(device) for k, v in batch.items()}
# 前向传播
outputs = model(**batch)
# 计算概率
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
# 获取预测和分数
predictions = torch.argmax(probabilities, dim=-1).cpu().numpy()
scores = probabilities.max(dim=-1)[0].cpu().numpy()
# 保存结果
all_predictions.extend(predictions)
all_scores.extend(scores)
# 7. 保存结果
print("\n步骤5: 保存结果")
results = pd.DataFrame({
"review": df["review"],
"sentiment": ["正面" if pred == 1 else "负面" for pred in all_predictions],
"confidence": all_scores
})
results.to_csv("sentiment_analysis_results.csv", index=False)
# 8. 分析结果
print("\n步骤6: 分析结果")
positive_count = sum(results["sentiment"] == "正面")
negative_count = sum(results["sentiment"] == "负面")
print(f"分析完成!")
print(f"正面评论: {positive_count} ({positive_count/len(results)*100:.2f}%)")
print(f"负面评论: {negative_count} ({negative_count/len(results)*100:.2f}%)")
print(f"平均置信度: {results['confidence'].mean():.4f}")
return results
# 运行流水线
# results = build_sentiment_analysis_pipeline()以下是一个结合分类和聚类的文本处理流水线:
import pandas as pd
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import numpy as np
def build_text_analysis_pipeline():
# 1. 加载数据
print("步骤1: 加载文本数据")
# 假设我们有一个包含新闻文章的数据集
if not os.path.exists("news_articles.csv"):
# 创建示例数据
data = {
"title": [f"新闻标题{i}" for i in range(100)],
"content": [f"这是第{i}篇新闻文章的内容,包含了各种信息和观点。" * 5 for i in range(100)]
}
df = pd.DataFrame(data)
df.to_csv("news_articles.csv", index=False)
print(f"创建了示例数据集,共 {len(df)} 篇文章")
else:
df = pd.read_csv("news_articles.csv")
print(f"加载了现有数据集,共 {len(df)} 篇文章")
# 2. 加载模型提取嵌入
print("\n步骤2: 提取文本嵌入")
model_name = "bert-base-chinese"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
# 移动模型到GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 批量提取嵌入函数
def get_embeddings(texts, batch_size=32):
model.eval()
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
# 编码
inputs = tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(device)
# 提取嵌入
with torch.no_grad():
outputs = model(**inputs)
# 使用[CLS] token的嵌入作为句子表示
embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
all_embeddings.extend(embeddings)
return np.array(all_embeddings)
# 提取所有文章的嵌入
embeddings = get_embeddings(df["content"].tolist())
print(f"嵌入提取完成,形状: {embeddings.shape}")
# 3. 标准化嵌入
print("\n步骤3: 标准化嵌入")
scaler = StandardScaler()
normalized_embeddings = scaler.fit_transform(embeddings)
# 4. K-means聚类
print("\n步骤4: 执行K-means聚类")
num_clusters = 5 # 假设我们想分成5个类别
kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(normalized_embeddings)
# 5. 保存结果
print("\n步骤5: 保存聚类结果")
df["cluster"] = cluster_labels
# 分析每个聚类的文章数量
cluster_counts = df["cluster"].value_counts().sort_index()
print("聚类统计:")
for cluster, count in cluster_counts.items():
print(f"聚类 {cluster}: {count} 篇文章")
# 保存结果
df.to_csv("clustered_news_articles.csv", index=False)
return df
# 运行流水线
# clustered_df = build_text_analysis_pipeline()对于复杂任务,我们可以集成多个模型进行批量处理:
import pandas as pd
from transformers import pipeline
import time
def build_multi_model_pipeline():
# 1. 准备数据
print("步骤1: 准备输入数据")
data = {
"text": [
"苹果公司今天发布了新款iPhone,销量预计将创新高。",
"这部电影真是太棒了,演员的表演非常出色!",
"明天天气如何?会不会下雨?",
"你能帮我翻译这句话吗:Hello world!",
"Python中如何实现快速排序算法?"
] * 100 # 500条样本
}
df = pd.DataFrame(data)
print(f"数据集准备完成,共 {len(df)} 条文本")
# 2. 加载多个模型
print("\n步骤2: 加载处理模型")
# 情感分析模型
sentiment_analyzer = pipeline("sentiment-analysis", model="uer/roberta-base-finetuned-jd-binary-chinese")
# 命名实体识别模型
ner_pipeline = pipeline("ner", model="ckiplab/bert-base-chinese-ner", aggregation_strategy="simple")
# 文本分类模型
classifier = pipeline("text-classification", model="uer/roberta-base-finetuned-jd-binary-chinese")
# 3. 批量处理函数
def batch_process_texts(texts, batch_size=16):
results = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
# 情感分析
sentiment_results = sentiment_analyzer(batch)
# 命名实体识别
ner_results = []
for text in batch:
entities = ner_pipeline(text)
ner_results.append(entities)
# 文本分类
classification_results = classifier(batch)
# 合并结果
for j, text in enumerate(batch):
result = {
"text": text,
"sentiment": sentiment_results[j]["label"],
"sentiment_score": sentiment_results[j]["score"],
"entities": ner_results[j],
"classification": classification_results[j]["label"],
"classification_score": classification_results[j]["score"]
}
results.append(result)
# 打印进度
print(f"已处理 {min(i+batch_size, len(texts))}/{len(texts)} 条文本")
return results
# 4. 执行批量处理
print("\n步骤3: 执行多模型批量处理")
start_time = time.time()
processed_results = batch_process_texts(df["text"].tolist(), batch_size=32)
end_time = time.time()
print(f"\n处理完成!耗时: {end_time - start_time:.2f}秒")
print(f"平均处理速度: {len(df)/(end_time - start_time):.2f} 条/秒")
# 5. 保存和分析结果
print("\n步骤4: 保存和分析结果")
# 转换实体识别结果为字符串格式以便保存
for result in processed_results:
result["entities_str"] = str(result["entities"])
# 创建结果DataFrame
results_df = pd.DataFrame(processed_results)
results_df = results_df[["text", "sentiment", "sentiment_score", "entities_str", "classification", "classification_score"]]
# 保存结果
results_df.to_csv("multi_model_analysis_results.csv", index=False)
# 分析基本统计
sentiment_counts = results_df["sentiment"].value_counts()
classification_counts = results_df["classification"].value_counts()
print("\n处理结果统计:")
print("情感分析分布:")
print(sentiment_counts)
print("\n文本分类分布:")
print(classification_counts)
return results_df
# 运行流水线
# multi_model_results = build_multi_model_pipeline()根据2025年的最新研究和实践,以下是LLM批量处理的一些最佳实践:
在实际应用中,LLM批量处理面临一些常见挑战:
根据2025年最新的技术发展趋势,LLM批量处理领域正在向以下方向发展:
某新闻媒体公司需要对每天 millions 级别的新闻文本进行分类。通过优化的批量处理技术,他们将处理时间从原来的8小时减少到了45分钟。
关键优化点:
社交媒体分析公司需要实时分析用户发布的内容。通过批处理技术和流式处理相结合的方式,他们实现了准实时的情感分析。
系统架构:
大型企业需要从海量文档中提取知识。通过优化的批量嵌入生成和向量检索技术,他们构建了高效的企业知识库。
技术方案:
通过本文的学习,我们掌握了LLM批量处理的核心技术和最佳实践:
要进一步提升LLM批量处理能力,可以考虑以下学习路径:
批量处理是LLM从实验室走向实际应用的关键技术。随着LLM技术的不断发展和硬件的持续进步,批量处理的效率和能力也在不断提升。通过掌握本文介绍的技术和方法,你将能够构建高效、可扩展的LLM批量处理系统,为各种实际应用场景提供强大支持。
在未来的AI时代,批量处理技术将继续发挥重要作用,帮助我们更高效地处理和利用海量数据,释放AI技术的全部潜力。通过持续学习和实践,你将能够在这个快速发展的领域中保持竞争力,不断创新和进步。
通过批量处理技术,我们不仅能够提高效率,还能够探索更大规模、更复杂的AI应用场景,推动人工智能技术的普及和发展。让我们一起探索LLM批量处理的无限可能!