序列标注(Sequence Labeling)是自然语言处理(NLP)中的一项基础任务,其目标是为序列中的每个元素分配一个标签。在NLP领域,序列标注技术广泛应用于分词、词性标注、命名实体识别、情感分析等任务。
序列标注任务的核心要素包括:
输入序列: [词1, 词2, 词3, ..., 词n]
输出序列: [标签1, 标签2, 标签3, ..., 标签n]序列标注在NLP应用中具有基础性作用:
序列标注技术的发展经历了以下几个重要阶段:
传统统计方法阶段(1990s-2000s)
特征工程阶段(2000s-2010s)
深度学习阶段(2010s-2018)
预训练语言模型时代(2018至今)
隐马尔可夫模型(Hidden Markov Model, HMM)是一种基于概率的序列建模方法,广泛应用于早期的序列标注任务。
HMM假设观测序列(如词语)是由隐藏状态序列(如词性标签)生成的,模型包含三个关键参数:
HMM有三个基本问题:
import numpy as np
class HMM:
def __init__(self, n_states, n_observations):
self.n_states = n_states
self.n_observations = n_observations
# 初始化模型参数
self.pi = np.ones(n_states) / n_states # 初始状态概率
self.A = np.ones((n_states, n_states)) / n_states # 状态转移概率
self.B = np.ones((n_states, n_observations)) / n_observations # 发射概率
def forward(self, observations):
# 前向算法
T = len(observations)
alpha = np.zeros((T, self.n_states))
# 初始化
alpha[0] = self.pi * self.B[:, observations[0]]
# 递推
for t in range(1, T):
for s in range(self.n_states):
alpha[t, s] = np.sum(alpha[t-1] * self.A[:, s]) * self.B[s, observations[t]]
return alpha
def backward(self, observations):
# 后向算法
T = len(observations)
beta = np.zeros((T, self.n_states))
# 初始化
beta[T-1] = 1
# 递推
for t in range(T-2, -1, -1):
for s in range(self.n_states):
beta[t, s] = np.sum(self.A[s, :] * self.B[:, observations[t+1]] * beta[t+1])
return beta
def viterbi(self, observations):
# Viterbi算法,用于解码
T = len(observations)
delta = np.zeros((T, self.n_states))
psi = np.zeros((T, self.n_states), dtype=int)
# 初始化
delta[0] = self.pi * self.B[:, observations[0]]
# 递推
for t in range(1, T):
for s in range(self.n_states):
trans_probs = delta[t-1] * self.A[:, s]
delta[t, s] = np.max(trans_probs) * self.B[s, observations[t]]
psi[t, s] = np.argmax(trans_probs)
# 回溯
path = np.zeros(T, dtype=int)
path[T-1] = np.argmax(delta[T-1])
for t in range(T-2, -1, -1):
path[t] = psi[t+1, path[t+1]]
return path, delta
def baum_welch(self, observations, max_iter=100, tol=1e-6):
# Baum-Welch算法,用于参数估计
T = len(observations)
for _ in range(max_iter):
# E步
alpha = self.forward(observations)
beta = self.backward(observations)
# 计算ξ_t(i,j) = P(q_t = i, q_{t+1} = j | O, λ)
xi = np.zeros((T-1, self.n_states, self.n_states))
for t in range(T-1):
denominator = np.sum(np.outer(alpha[t], beta[t+1] * self.B[:, observations[t+1]]) * self.A)
for i in range(self.n_states):
for j in range(self.n_states):
xi[t, i, j] = alpha[t, i] * self.A[i, j] * self.B[j, observations[t+1]] * beta[t+1, j] / denominator
# 计算γ_t(i) = P(q_t = i | O, λ)
gamma = np.zeros((T, self.n_states))
for t in range(T):
denominator = np.sum(alpha[t] * beta[t])
gamma[t] = alpha[t] * beta[t] / denominator
# M步
# 更新pi
new_pi = gamma[0]
# 更新A
new_A = np.sum(xi, axis=0) / np.sum(gamma[:-1], axis=0, keepdims=True).T
# 更新B
new_B = np.zeros((self.n_states, self.n_observations))
for s in range(self.n_states):
for o in range(self.n_observations):
new_B[s, o] = np.sum(gamma[t, s] for t in range(T) if observations[t] == o)
new_B[s] /= np.sum(gamma[:, s])
# 检查收敛
if np.max(np.abs(new_pi - self.pi)) < tol and \
np.max(np.abs(new_A - self.A)) < tol and \
np.max(np.abs(new_B - self.B)) < tol:
break
# 更新参数
self.pi = new_pi
self.A = new_A
self.B = new_B条件随机场(Conditional Random Field, CRF)是一种判别式概率模型,在序列标注任务中表现优异。
CRF直接建模条件概率P(Y|X),其中X是观测序列(输入),Y是标签序列(输出)。CRF假设标签序列Y满足马尔可夫性质,即当前标签只依赖于相邻的标签。
CRF使用特征函数来捕捉输入和标签之间的关系:
import numpy as np
from sklearn.metrics import classification_report
class LinearCRF:
def __init__(self, n_labels, feature_extractor):
self.n_labels = n_labels
self.feature_extractor = feature_extractor
self.weights = None
def extract_features(self, x, i, y_prev, y_curr):
# 提取特征
return self.feature_extractor(x, i, y_prev, y_curr)
def compute_score(self, x, y):
# 计算整个序列的分数
score = 0
y_prev = -1 # 起始标记
for i, y_curr in enumerate(y):
features = self.extract_features(x, i, y_prev, y_curr)
score += np.dot(self.weights, features)
y_prev = y_curr
return score
def forward_algorithm(self, x):
# 前向算法计算归一化因子
T = len(x)
# alpha[t][y] = 到位置t,标签为y的最大分数
alpha = np.zeros((T, self.n_labels))
# 初始化
for y in range(self.n_labels):
features = self.extract_features(x, 0, -1, y)
alpha[0, y] = np.dot(self.weights, features)
# 递推
for t in range(1, T):
for y_curr in range(self.n_labels):
max_score = -float('inf')
for y_prev in range(self.n_labels):
features = self.extract_features(x, t, y_prev, y_curr)
score = alpha[t-1, y_prev] + np.dot(self.weights, features)
if score > max_score:
max_score = score
alpha[t, y_curr] = max_score
return alpha
def viterbi_decode(self, x):
# Viterbi解码,找到最优标签序列
T = len(x)
# delta[t][y] = 到位置t,标签为y的最大分数
delta = np.zeros((T, self.n_labels))
# psi[t][y] = 记录达到位置t,标签为y时的前一个标签
psi = np.zeros((T, self.n_labels), dtype=int)
# 初始化
for y in range(self.n_labels):
features = self.extract_features(x, 0, -1, y)
delta[0, y] = np.dot(self.weights, features)
# 递推
for t in range(1, T):
for y_curr in range(self.n_labels):
max_score = -float('inf')
best_prev = 0
for y_prev in range(self.n_labels):
features = self.extract_features(x, t, y_prev, y_curr)
score = delta[t-1, y_prev] + np.dot(self.weights, features)
if score > max_score:
max_score = score
best_prev = y_prev
delta[t, y_curr] = max_score
psi[t, y_curr] = best_prev
# 回溯
y = np.zeros(T, dtype=int)
y[-1] = np.argmax(delta[-1])
for t in range(T-2, -1, -1):
y[t] = psi[t+1, y[t+1]]
return y
def train(self, X, Y, max_iter=100, learning_rate=0.01):
# 提取特征维度
sample_x, sample_y = X[0], Y[0]
sample_features = self.extract_features(sample_x, 0, -1, sample_y[0])
feature_dim = len(sample_features)
# 初始化权重
self.weights = np.zeros(feature_dim)
# 训练迭代
for _ in range(max_iter):
total_grad = np.zeros(feature_dim)
for x, y in zip(X, Y):
# 计算真实路径的特征期望
expected_real = np.zeros(feature_dim)
y_prev = -1
for i, y_curr in enumerate(y):
features = self.extract_features(x, i, y_prev, y_curr)
expected_real += features
y_prev = y_curr
# 计算所有路径的特征期望
expected_all = self._compute_expected_features(x)
# 梯度更新
total_grad += expected_real - expected_all
# 更新权重
self.weights += learning_rate * total_grad / len(X)
def _compute_expected_features(self, x):
# 计算所有路径的特征期望
T = len(x)
expected = np.zeros(len(self.weights))
# 计算前向和后向概率
alpha = self.forward_algorithm(x)
# 为了数值稳定性,取对数概率
log_z = np.logaddexp.reduce(alpha[-1])
# 起始位置的期望
for y in range(self.n_labels):
features = self.extract_features(x, 0, -1, y)
prob = np.exp(alpha[0, y] - log_z)
expected += features * prob
# 后续位置的期望
for t in range(1, T):
for y_prev in range(self.n_labels):
for y_curr in range(self.n_labels):
features = self.extract_features(x, t, y_prev, y_curr)
# 计算转移概率
trans_score = np.dot(self.weights, features)
prob = np.exp(alpha[t-1, y_prev] + trans_score + self._backward(t, y_curr, x) - log_z)
expected += features * prob
return expected
def _backward(self, t, y, x):
# 简化的后向计算,实际应用中应完整实现后向算法
# 这里仅作为示例
T = len(x)
if t == T - 1:
return 0
max_score = -float('inf')
for y_next in range(self.n_labels):
features = self.extract_features(x, t+1, y, y_next)
score = np.dot(self.weights, features)
max_score = max(max_score, score)
return max_score循环神经网络(RNN)因其能够捕捉序列数据的时序依赖关系,在序列标注任务中表现出色。
模型结构:
优势:
缺点:
长短期记忆网络(LSTM)和门控循环单元(GRU)通过引入门控机制,解决了传统RNN的梯度消失问题。
LSTM核心组件:
GRU核心组件:
双向LSTM(BiLSTM)同时考虑了序列的前向和后向信息,提供了更丰富的上下文表示。
Python实现示例:
import torch
import torch.nn as nn
import torch.optim as optim
class BiLSTMSequenceTagger(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout=0.1):
super(BiLSTMSequenceTagger, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 双向LSTM层
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
bidirectional=True,
batch_first=True,
dropout=dropout
)
# 输出层
self.fc = nn.Linear(hidden_dim * 2, output_dim)
# Dropout层
self.dropout = nn.Dropout(dropout)
def forward(self, x, x_lengths):
# x形状: [batch_size, seq_len]
batch_size, seq_len = x.shape
# 词嵌入
embedded = self.dropout(self.embedding(x)) # [batch_size, seq_len, embedding_dim]
# 处理变长序列
packed = nn.utils.rnn.pack_padded_sequence(
embedded,
x_lengths,
batch_first=True,
enforce_sorted=False
)
# LSTM前向传播
packed_output, _ = self.lstm(packed)
# 解压序列
output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
# 输出层
logits = self.fc(self.dropout(output)) # [batch_size, seq_len, output_dim]
return logits
def predict(self, x, x_lengths, tag_pad_idx):
# 获取预测标签
logits = self.forward(x, x_lengths)
# 应用softmax
probs = torch.softmax(logits, dim=-1)
# 获取预测标签
predictions = torch.argmax(probs, dim=-1)
# 处理填充位置
mask = (x != tag_pad_idx).unsqueeze(-1)
predictions = predictions * mask.squeeze(-1)
return predictionsBiLSTM-CRF模型结合了BiLSTM的特征提取能力和CRF的结构化预测能力,是序列标注的经典模型。
核心组件:
工作流程:
BiLSTM-CRF使用负对数似然作为损失函数:
Loss = log(exp(score(y_hat)) / sum(exp(score(y))))其中:
class BiLSTM_CRF(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size, dropout=0.1):
super(BiLSTM_CRF, self).__init__()
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.tagset_size = tagset_size
# 词嵌入层
self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
# BiLSTM层
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
bidirectional=True,
batch_first=True,
dropout=dropout
)
# 线性层将LSTM输出映射到标签空间
self.hidden2tag = nn.Linear(hidden_dim * 2, tagset_size)
# CRF层参数:转移矩阵
self.transitions = nn.Parameter(
torch.randn(tagset_size, tagset_size)
)
# 初始化转移矩阵,使开始标签不能转移到结束标签,结束标签不能转移到其他标签
self.transitions.data[tagset_size-1, :] = -10000
self.transitions.data[:, 0] = -10000
def _get_lstm_features(self, sentence, lengths):
# 词嵌入
embeds = self.word_embeds(sentence) # [batch_size, seq_len, embedding_dim]
# 处理变长序列
packed = nn.utils.rnn.pack_padded_sequence(
embeds,
lengths,
batch_first=True,
enforce_sorted=False
)
# LSTM前向传播
lstm_out, _ = self.lstm(packed)
# 解压序列
lstm_out, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)
# 映射到标签空间
lstm_feats = self.hidden2tag(lstm_out) # [batch_size, seq_len, tagset_size]
return lstm_feats
def _forward_alg(self, feats):
# 前向算法计算所有路径的分数和
# feats形状: [seq_len, batch_size, tagset_size]
seq_len, batch_size, tagset_size = feats.shape
# 初始化前向变量
init_alphas = torch.full((batch_size, tagset_size), -10000.)
# 开始标签的初始分数为0
init_alphas[:, 0] = 0.
# 将前向变量放入GPU
forward_var = init_alphas
# 迭代句子中的每个单词
for feat in feats:
# feat形状: [batch_size, tagset_size]
# 创建转移分数的副本,将发射分数添加到每个可能的转移中
emit_score = feat.unsqueeze(2).expand(batch_size, tagset_size, tagset_size)
trans_score = self.transitions.unsqueeze(0).expand(batch_size, tagset_size, tagset_size)
next_tag_var = forward_var.unsqueeze(1).expand(batch_size, tagset_size, tagset_size) + trans_score + emit_score
# 对所有可能的前一个标签求和
forward_var = torch.logsumexp(next_tag_var, dim=1)
# 加上转移到结束标签的分数
terminal_var = forward_var + self.transitions[0]
alpha = torch.logsumexp(terminal_var, dim=1)
return alpha
def _score_sentence(self, feats, tags):
# 计算给定标签序列的分数
# feats形状: [seq_len, batch_size, tagset_size]
# tags形状: [seq_len, batch_size]
seq_len, batch_size = tags.shape
# 初始化分数
score = torch.zeros(batch_size)
# 从开始标签到第一个标签的转移
score += self.transitions[0, tags[0]]
# 累加转移分数和发射分数
for i in range(seq_len-1):
# 转移分数
score += self.transitions[tags[i], tags[i+1]]
# 发射分数
score += feats[i, torch.arange(batch_size), tags[i]]
# 加上最后一个标签到结束标签的转移分数
score += feats[-1, torch.arange(batch_size), tags[-1]]
score += self.transitions[tags[-1], 0]
return score
def neg_log_likelihood(self, sentence, tags, lengths):
# 计算负对数似然损失
# 处理序列长度
batch_size = sentence.shape[0]
max_length = sentence.shape[1]
# 提取特征
feats = self._get_lstm_features(sentence, lengths) # [batch_size, seq_len, tagset_size]
# 转换为[seq_len, batch_size, tagset_size]
feats = feats.transpose(0, 1)
# 计算所有路径的分数和
forward_score = self._forward_alg(feats)
# 计算正确路径的分数
gold_score = self._score_sentence(feats, tags.transpose(0, 1))
# 返回平均负对数似然
return torch.mean(forward_score - gold_score)
def forward(self, sentence, lengths):
# 前向传播,用于预测
# 提取特征
feats = self._get_lstm_features(sentence, lengths) # [batch_size, seq_len, tagset_size]
# 转换为[seq_len, batch_size, tagset_size]
feats = feats.transpose(0, 1)
seq_len, batch_size, tagset_size = feats.shape
# 使用Viterbi算法解码
# 初始化维特比变量
backpointers = []
vit = torch.full((batch_size, tagset_size), -10000.)
vit[:, 0] = 0. # 开始标签
for i in range(seq_len):
vit_prev = vit.unsqueeze(1)
trans = self.transitions.unsqueeze(0)
emit = feats[i].unsqueeze(1)
next_vit = vit_prev + trans + emit
# 找出每个状态的最佳前一个状态
best_tag_ids = torch.argmax(next_vit, dim=2)
best_score = torch.max(next_vit, dim=2)[0]
backpointers.append(best_tag_ids)
vit = best_score
# 加上转移到结束标签的分数
vit += self.transitions[0]
# 找出最佳路径的结束状态
best_tag_ids = torch.argmax(vit, dim=1)
# 回溯构建最佳路径
best_paths = [best_tag_ids[i].item() for i in range(batch_size)]
for bptrs_t in reversed(backpointers):
best_tag_ids = [bptrs_t[i][best_paths[i]] for i in range(batch_size)]
best_paths = [best_tag_ids[i] for i in range(batch_size)] + best_paths
# 将路径重塑为[batch_size, seq_len]
batch_paths = []
for i in range(batch_size):
path = best_paths[i::batch_size]
# 裁剪到实际序列长度
path = path[:lengths[i].item()]
# 填充到最大长度
path += [0] * (max_length - len(path))
batch_paths.append(path)
return torch.tensor(batch_paths)Transformer架构凭借其强大的并行计算能力和长距离依赖建模能力,在序列标注任务中取得了显著成果。
模型结构:
优势:
import math
import torch
import torch.nn as nn
import torch.optim as optim
class TransformerSequenceTagger(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_heads, num_layers, output_dim, dropout=0.1):
super(TransformerSequenceTagger, self).__init__()
# 词嵌入层
self.embedding = nn.Embedding(vocab_size, embedding_dim)
# 位置编码
self.pos_encoder = PositionalEncoding(embedding_dim, dropout)
# Transformer编码器
encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=num_heads,
dim_feedforward=hidden_dim,
dropout=dropout
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# 输出层
self.fc = nn.Linear(embedding_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask=None):
# x形状: [seq_len, batch_size]
# 词嵌入
embedded = self.embedding(x) * math.sqrt(self.embedding.embedding_dim)
# 添加位置编码
embedded = self.pos_encoder(embedded)
# Transformer编码器
if mask is not None:
# 转换mask为Transformer需要的格式
mask = mask.transpose(0, 1)
src_key_padding_mask = mask == 0
else:
src_key_padding_mask = None
transformer_output = self.transformer_encoder(
embedded,
src_key_padding_mask=src_key_padding_mask
)
# 输出层
logits = self.fc(self.dropout(transformer_output))
return logits
def predict(self, x, mask=None):
# 预测函数
logits = self.forward(x, mask)
predictions = torch.argmax(logits, dim=-1)
return predictions
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# 生成位置编码
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
# x形状: [seq_len, batch_size, embedding_dim]
x = x + self.pe[:x.size(0), :]
return self.dropout(x)BERT等预训练语言模型通过大规模无监督学习,获取了丰富的语言知识,极大提升了序列标注任务的性能。
基本流程:
输入处理:
BERT-CRF:结合BERT的特征提取和CRF的结构化预测
BERT-Softmax:直接使用BERT输出进行标签分类
BERT-LSTM-CRF:在BERT和CRF之间添加LSTM层
from transformers import BertForTokenClassification, BertTokenizer, TrainingArguments, Trainer
import torch
import seqeval.metrics
class BertSequenceTagger:
def __init__(self, model_name, num_labels):
self.tokenizer = BertTokenizer.from_pretrained(model_name)
self.model = BertForTokenClassification.from_pretrained(
model_name,
num_labels=num_labels
)
def tokenize_and_align_labels(self, sentences, labels=None):
# 处理输入序列并对齐标签
tokenized_inputs = self.tokenizer(
sentences,
padding='max_length',
truncation=True,
max_length=128,
return_tensors='pt'
)
if labels is not None:
aligned_labels = []
for i, sentence in enumerate(sentences):
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
# 处理特殊标记
if word_idx is None:
label_ids.append(-100) # 忽略特殊标记
# 处理同一单词的多个标记
elif word_idx != previous_word_idx:
label_ids.append(labels[i][word_idx])
else:
label_ids.append(labels[i][word_idx] if self.args.label_all_tokens else -100)
previous_word_idx = word_idx
aligned_labels.append(label_ids)
tokenized_inputs['labels'] = torch.tensor(aligned_labels)
return tokenized_inputs
def train(self, train_dataset, val_dataset, epochs=3, learning_rate=2e-5):
# 设置训练参数
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=epochs,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=500,
weight_decay=0.01,
logging_dir='./logs',
evaluation_strategy='epoch'
)
# 初始化Trainer
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=self.compute_metrics
)
# 开始训练
trainer.train()
def compute_metrics(self, pred):
# 计算评估指标
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
# 忽略特殊位置
true_predictions = [
[p for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
true_labels = [
[l for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
# 计算准确率
results = seqeval.metrics.classification_report(
y_true=true_labels,
y_pred=true_predictions
)
return {
'precision': results['overall_precision'],
'recall': results['overall_recall'],
'f1': results['overall_f1'],
'accuracy': results['overall_accuracy']
}
def predict(self, sentences):
# 预测函数
self.model.eval()
tokenized_inputs = self.tokenizer(
sentences,
padding='max_length',
truncation=True,
max_length=128,
return_tensors='pt'
)
with torch.no_grad():
outputs = self.model(**tokenized_inputs)
predictions = torch.argmax(outputs.logits, dim=-1)
# 处理预测结果
results = []
for i, sentence in enumerate(sentences):
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
sentence_labels = []
sentence_tokens = []
for word_idx, pred in zip(word_ids, predictions[i]):
if word_idx is not None and word_idx != previous_word_idx:
sentence_tokens.append(self.tokenizer.convert_ids_to_tokens(tokenized_inputs['input_ids'][i][word_ids.index(word_idx)]))
sentence_labels.append(pred.item())
previous_word_idx = word_idx
results.append({
'tokens': sentence_tokens,
'labels': sentence_labels
})
return results除BERT外,还有许多预训练模型在序列标注任务中表现出色。
RoBERTa (Robustly optimized BERT approach)是对BERT的改进,通过以下方式提升性能:
ALBERT (A Lite BERT)通过参数共享等技术,大幅减少模型参数,同时保持性能。
ELECTRA使用生成器-判别器架构,其中判别器学习区分真实和生成的标记。
DeBERTa (Decoding-enhanced BERT)通过解耦注意力机制和相对位置编码,提升了模型性能。
预训练语言模型的多语言版本为跨语言序列标注提供了强大支持。
XLM-RoBERTa是一个多语言预训练模型,支持100多种语言,在跨语言迁移学习任务中表现出色。
主要策略:
准确率:正确预测的正例数占总预测正例数的比例
Precision = TP / (TP + FP)召回率:正确预测的正例数占实际正例数的比例
Recall = TP / (TP + FN)F1分数:准确率和召回率的调和平均
F1 = 2 * Precision * Recall / (Precision + Recall)准确率:正确预测的样本数占总样本数的比例
Accuracy = (TP + TN) / (TP + TN + FP + FN)对于命名实体识别等任务,通常使用实体级别的评估指标。
只有当实体的边界和类型都正确时,才视为正确预测。
允许实体边界有一定偏差的匹配方式。
seqeval是一个专门用于序列标注评估的Python库,支持各种评估指标的计算。
使用示例:
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score
# 真实标签和预测标签
y_true = [['O', 'B-PER', 'I-PER', 'O', 'B-LOC'], ['B-PER', 'I-PER', 'O', 'B-LOC']]
y_pred = [['O', 'B-PER', 'I-PER', 'O', 'O'], ['B-PER', 'O', 'O', 'B-LOC']]
# 计算F1分数
f1 = f1_score(y_true, y_pred)
print(f"F1 Score: {f1:.4f}")
# 生成详细的分类报告
report = classification_report(y_true, y_pred)
print(report)命名实体识别是最典型的序列标注任务之一,旨在识别文本中的人名、地名、组织名等实体。
常用的标签体系包括:
from transformers import BertForTokenClassification, BertTokenizer, TrainingArguments, Trainer
import torch
import seqeval.metrics
def compute_ner_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
true_predictions = [
[p for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
true_labels = [
[l for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
results = seqeval.metrics.classification_report(
y_true=true_labels,
y_pred=true_predictions
)
return {
'precision': results['overall_precision'],
'recall': results['overall_recall'],
'f1': results['overall_f1'],
'accuracy': results['overall_accuracy']
}
def prepare_ner_dataset(data, tokenizer, max_length=128):
dataset = []
for item in data:
text = item['text']
labels = item['labels']
# 分词并对齐标签
tokenized_inputs = tokenizer(
text,
truncation=True,
max_length=max_length,
return_tensors='pt'
)
# 处理标签对齐
word_ids = tokenized_inputs.word_ids()[0]
aligned_labels = []
previous_word_idx = None
for word_idx in word_ids:
if word_idx is None:
aligned_labels.append(-100) # 特殊标记
elif word_idx != previous_word_idx:
aligned_labels.append(labels[word_idx])
else:
aligned_labels.append(-100) # 同一词的后续标记
previous_word_idx = word_idx
dataset.append({
'input_ids': tokenized_inputs['input_ids'][0],
'attention_mask': tokenized_inputs['attention_mask'][0],
'labels': torch.tensor(aligned_labels)
})
return dataset
def train_ner_model(data, num_labels, model_name='bert-base-cased'):
# 初始化模型和分词器
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForTokenClassification.from_pretrained(model_name, num_labels=num_labels)
# 准备数据集
train_dataset = prepare_ner_dataset(data['train'], tokenizer)
val_dataset = prepare_ner_dataset(data['val'], tokenizer)
# 设置训练参数
training_args = TrainingArguments(
output_dir='./ner_results',
num_train_epochs=3,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=500,
weight_decay=0.01,
logging_dir='./ner_logs',
evaluation_strategy='epoch'
)
# 初始化Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_ner_metrics
)
# 开始训练
trainer.train()
# 保存模型
trainer.save_model('./ner_model')
tokenizer.save_pretrained('./ner_model')
return model, tokenizer
def predict_entities(text, model, tokenizer, label_map):
# 分词
inputs = tokenizer(
text,
return_tensors='pt',
truncation=True,
padding=True
)
# 预测
model.eval()
with torch.no_grad():
outputs = model(**inputs)
predictions = torch.argmax(outputs.logits, dim=2)
# 处理预测结果
tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
labels = predictions[0].tolist()
# 解析实体
entities = []
current_entity = None
current_entity_type = None
for i, (token, label_idx) in enumerate(zip(tokens, labels)):
label = label_map[label_idx]
if label.startswith('B-'):
# 开始新实体
if current_entity:
entities.append((current_entity_type, current_entity))
current_entity_type = label[2:]
current_entity = token
elif label.startswith('I-') and current_entity:
# 继续当前实体
if not token.startswith('##'): # 处理BERT分词
current_entity += ' '
current_entity += token.replace('##', '')
else:
# 非实体或实体结束
if current_entity:
entities.append((current_entity_type, current_entity))
current_entity = None
current_entity_type = None
# 处理最后一个实体
if current_entity:
entities.append((current_entity_type, current_entity))
return entities词性标注是为句子中的每个词语标注词性(如名词、动词、形容词等)的任务。
序列标注可用于细粒度情感分析,识别文本中表达情感的词语和短语。
任务定义:识别文本中表达积极、消极或中性情感的词语、短语或子句。
应用场景:
通常使用序列标注模型,将情感分析转换为标签预测问题:
分块是识别文本中短语结构(如名词短语、动词短语等)的任务。
常用IOB标注:
高质量的序列标注数据需要专业人员手工标注,成本高昂。
解决方案:
序列标注任务中,通常正类样本(如实体)远少于负类样本(如非实体)。
解决方案:
传统的序列标注模型难以处理嵌套实体(一个实体包含在另一个实体中)。
解决方案:
数据增强可以有效扩充训练数据,提高模型泛化能力。
常用数据增强方法:
Python实现示例:
import nlpaug.augmenter.word as naw
import nlpaug.augmenter.sentence as nas
def augment_sequence_labeling_data(texts, labels, num_aug=3):
# 初始化增强器
aug1 = naw.SynonymAug(aug_src='wordnet') # 同义词替换
aug2 = naw.ContextualWordEmbsAug(model_path='bert-base-uncased', action="substitute") # 上下文词替换
aug3 = nas.BackTranslationAug(from_model_name='facebook/wmt19-en-de', to_model_name='facebook/wmt19-de-en') # 回译
augmenters = [aug1, aug2, aug3]
augmented_texts = []
augmented_labels = []
for text, label_seq in zip(texts, labels):
# 添加原始数据
augmented_texts.append(text)
augmented_labels.append(label_seq)
# 应用数据增强
words = text.split()
for i in range(num_aug):
aug = augmenters[i % len(augmenters)]
try:
if i % len(augmenters) == 2: # 回译
aug_text = aug.augment(text)
aug_words = aug_text.split()
else:
aug_words = aug.augment(words)
# 确保增强后的文本长度与标签序列匹配
# 这里采用简化策略,实际应用中可能需要更复杂的处理
if len(aug_words) == len(words):
augmented_texts.append(' '.join(aug_words))
augmented_labels.append(label_seq)
except:
continue
return augmented_texts, augmented_labels半监督学习利用大量未标注数据提升模型性能。
主要方法:
主动学习通过选择最有价值的样本进行标注,提高数据标注效率。
选择策略:
2025年,大语言模型在序列标注领域带来了革命性变化。
先进的大语言模型如GPT-5、Gemini Ultra可以在零样本条件下执行高质量的序列标注,无需额外训练数据。
工作原理:
通过提示工程,可以有效引导大语言模型执行序列标注任务。
示例提示:
请为以下句子中的每个词语标注其词性(POS),使用Universal POS标签。输出格式为"词语/POS标签",每个词语占一行。
句子:苹果公司今天发布了新款手机。输出示例:
苹果/NOUN
公司/NOUN
今天/NOUN
发布/VERB
了/PART
新款/ADJ
手机/NOUN
。/PUNCT2025年,参数高效微调技术在序列标注任务中得到广泛应用。
LoRA技术通过低秩分解减少可训练参数,使大模型微调变得高效。
优势:
实现示例:
from transformers import AutoModelForTokenClassification, AutoTokenizer
from peft import get_peft_model, LoraConfig
import torch
def create_lora_ner_model(base_model_name, num_labels):
# 加载基础模型
model = AutoModelForTokenClassification.from_pretrained(base_model_name, num_labels=num_labels)
# 配置LoRA
lora_config = LoraConfig(
r=8, # 秩
lora_alpha=32, # 缩放因子
target_modules=["query", "value"], # 目标模块
lora_dropout=0.1, # Dropout概率
bias="none" # 偏置处理方式
)
# 应用LoRA
lora_model = get_peft_model(model, lora_config)
# 打印可训练参数比例
trainable_params = 0
all_param = 0
for _, param in lora_model.named_parameters():
all_param += param.numel()
if param.requires_grad:
trainable_params += param.numel()
print(f"Trainable params: {trainable_params}")
print(f"All params: {all_param}")
print(f"Trainable%: {100 * trainable_params / all_param:.2f}%")
return lora_modelAdapter技术通过在预训练模型中插入小型可训练模块,实现参数高效微调。
应用优势:
2025年,可持续发展成为AI领域的重要趋势,序列标注也不例外。
绿色序列标注关注模型的能耗和碳足迹,通过各种优化技术减少环境影响。
实现方法:
轻量级序列标注模型针对资源受限环境优化,在保持一定性能的前提下显著减少计算和内存需求。
技术路线:
2025年,序列标注不再局限于纯文本,而是扩展到多模态领域。
图文序列标注结合图像和文本信息,分析跨模态的实体和关系。
应用场景:
语音序列标注直接从语音信号中分析语音单元的标签。
技术挑战:
数据收集策略:
标注指南:
预处理步骤:
Python实现示例:
import re
import string
def preprocess_text(text, lowercase=True, remove_punct=True, remove_digits=False):
# 文本预处理
if lowercase:
text = text.lower()
if remove_punct:
# 移除标点符号
translator = str.maketrans('', '', string.punctuation)
text = text.translate(translator)
if remove_digits:
# 移除数字
text = re.sub(r'\d+', '', text)
# 移除多余空格
text = re.sub(r'\s+', ' ', text).strip()
return text
def prepare_sequence_labeling_data(texts, labels, tokenizer, max_length=128):
# 准备序列标注数据
data = []
for text, label_seq in zip(texts, labels):
# 分词
tokens = tokenizer.tokenize(text)
# 截断
if len(tokens) > max_length - 2: # 预留[CLS]和[SEP]的位置
tokens = tokens[:max_length - 2]
label_seq = label_seq[:max_length - 2]
# 添加特殊标记
tokens = ['[CLS]'] + tokens + ['[SEP]']
input_ids = tokenizer.convert_tokens_to_ids(tokens)
# 处理标签
# 特殊标记的标签设为-100(会被忽略)
aligned_labels = [-100] + label_seq + [-100]
# 填充
padding_length = max_length - len(input_ids)
input_ids += [tokenizer.pad_token_id] * padding_length
aligned_labels += [-100] * padding_length
# 创建注意力掩码
attention_mask = [1] * len(tokens) + [0] * padding_length
data.append({
'input_ids': input_ids,
'attention_mask': attention_mask,
'labels': aligned_labels
})
return data选择合适的序列标注模型需要考虑多个因素:
因素 | 建议 |
|---|---|
数据规模 | 小规模数据:BiLSTM-CRF大规模数据:预训练模型 |
计算资源 | 资源受限:轻量级模型资源充足:预训练模型 |
推理速度要求 | 高实时性:轻量级模型、模型压缩低实时性:复杂模型 |
精度要求 | 高精度:预训练模型、集成方法一般精度:传统方法 |
语言特性 | 形态丰富语言:CRF类模型上下文重要:预训练模型 |
提高序列标注模型性能的关键技巧:
Python实现示例:
from transformers import TrainingArguments, Trainer
def compute_seqeval_metrics(pred):
# 计算seqeval指标
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
true_predictions = [
[p for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
true_labels = [
[l for (p, l) in zip(pred, label) if l != -100]
for pred, label in zip(preds, labels)
]
results = seqeval.metrics.classification_report(
y_true=true_labels,
y_pred=true_predictions,
output_dict=True
)
return {
'precision': results['macro avg']['precision'],
'recall': results['macro avg']['recall'],
'f1': results['macro avg']['f1-score'],
'accuracy': results['accuracy']
}
def train_sequence_tagger(model, train_dataset, val_dataset, epochs=5, learning_rate=2e-5):
# 设置训练参数
training_args = TrainingArguments(
output_dir='./results',
num_train_epochs=epochs,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
warmup_steps=500, # 预热步数
weight_decay=0.01, # 权重衰减
logging_dir='./logs',
evaluation_strategy='epoch', # 每个epoch评估一次
save_strategy='epoch', # 每个epoch保存一次模型
load_best_model_at_end=True, # 训练结束后加载最佳模型
metric_for_best_model='f1', # 使用F1分数作为最佳模型的指标
gradient_accumulation_steps=2, # 梯度累积步数
gradient_checkpointing=True, # 使用梯度检查点节省内存
fp16=True # 混合精度训练
)
# 初始化Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_seqeval_metrics
)
# 开始训练
trainer.train()
# 评估最佳模型
eval_results = trainer.evaluate()
print(f"Evaluation results: {eval_results}")
return trainer, eval_results序列标注模型的评估应考虑多方面因素:
Python实现示例:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np
def analyze_ner_errors(y_true, y_pred, label_map):
# 分析NER模型错误
# 展平标签序列
flat_true = np.concatenate([[l for l in seq if l != -100] for seq in y_true])
flat_pred = np.concatenate([[p for p, l in zip(pred, true) if l != -100]
for pred, true in zip(y_pred, y_true)])
# 构建混淆矩阵
cm = confusion_matrix(flat_true, flat_pred)
# 可视化混淆矩阵
plt.figure(figsize=(10, 8))
labels = [label_map[i] for i in range(len(label_map))]
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=labels, yticklabels=labels)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.savefig('ner_confusion_matrix.png')
# 分析错误类型
errors = []
for i in range(len(label_map)):
for j in range(len(label_map)):
if i != j and cm[i, j] > 0:
errors.append((label_map[i], label_map[j], cm[i, j]))
# 按错误数量排序
errors.sort(key=lambda x: x[2], reverse=True)
print("Top 10 Common Errors:")
for true_label, pred_label, count in errors[:10]:
print(f"True: {true_label}, Pred: {pred_label}, Count: {count}")
return errors
def evaluate_entity_boundaries(y_true, y_pred, label_map):
# 评估实体边界检测性能
true_entities = []
pred_entities = []
# 解析真实实体
for seq_idx, seq in enumerate(y_true):
current_entity = None
current_type = None
start_idx = None
for i, label in enumerate(seq):
if label == -100:
continue
label_str = label_map[label]
if label_str.startswith('B-'):
# 结束当前实体
if current_entity:
true_entities.append((seq_idx, start_idx, i-1, current_type))
# 开始新实体
current_type = label_str[2:]
current_entity = True
start_idx = i
elif label_str.startswith('I-') and current_entity:
# 继续当前实体
pass
else:
# 结束当前实体
if current_entity:
true_entities.append((seq_idx, start_idx, i-1, current_type))
current_entity = None
# 处理最后一个实体
if current_entity:
true_entities.append((seq_idx, start_idx, len(seq)-1, current_type))
# 解析预测实体
for seq_idx, seq in enumerate(y_pred):
current_entity = None
current_type = None
start_idx = None
for i, label in enumerate(seq):
if label == -100:
continue
label_str = label_map[label]
if label_str.startswith('B-'):
# 结束当前实体
if current_entity:
pred_entities.append((seq_idx, start_idx, i-1, current_type))
# 开始新实体
current_type = label_str[2:]
current_entity = True
start_idx = i
elif label_str.startswith('I-') and current_entity:
# 继续当前实体
pass
else:
# 结束当前实体
if current_entity:
pred_entities.append((seq_idx, start_idx, i-1, current_type))
current_entity = None
# 处理最后一个实体
if current_entity:
pred_entities.append((seq_idx, start_idx, len(seq)-1, current_type))
# 计算边界准确率
correct_boundaries = 0
for true_ent in true_entities:
for pred_ent in pred_entities:
if true_ent[0] == pred_ent[0] and true_ent[3] == pred_ent[3]: # 同一句话,同类型
if true_ent[1] == pred_ent[1] and true_ent[2] == pred_ent[2]: # 边界完全匹配
correct_boundaries += 1
break
boundary_precision = correct_boundaries / len(pred_entities) if pred_entities else 0
boundary_recall = correct_boundaries / len(true_entities) if true_entities else 0
boundary_f1 = 2 * boundary_precision * boundary_recall / (boundary_precision + boundary_recall) if (boundary_precision + boundary_recall) > 0 else 0
print(f"Boundary Precision: {boundary_precision:.4f}")
print(f"Boundary Recall: {boundary_recall:.4f}")
print(f"Boundary F1: {boundary_f1:.4f}")
return {
'precision': boundary_precision,
'recall': boundary_recall,
'f1': boundary_f1,
'true_entities': len(true_entities),
'pred_entities': len(pred_entities),
'correct_boundaries': correct_boundaries
}针对序列标注模型的常见优化策略:
序列标注模型的部署需要考虑多个因素:
部署场景 | 推荐方法 | 优势 | 劣势 |
|---|---|---|---|
服务器部署 | Docker容器 | 环境一致性 | 资源消耗较大 |
边缘设备 | 模型量化、剪枝 | 低延迟、隐私保护 | 精度可能下降 |
云端服务 | API服务 | 易于扩展 | 依赖网络连接 |
移动应用 | TFLite、ONNX Runtime | 本地运行 | 受设备性能限制 |
提高序列标注模型推理速度的方法:
Python实现示例(使用ONNX Runtime进行推理加速):
import torch
import onnx
import onnxruntime as ort
import numpy as np
def export_to_onnx(model, tokenizer, onnx_path='sequence_tagger.onnx', max_length=128):
# 导出模型为ONNX格式
dummy_input = {
'input_ids': torch.zeros((1, max_length), dtype=torch.long),
'attention_mask': torch.zeros((1, max_length), dtype=torch.long)
}
# 导出模型
torch.onnx.export(
model,
(dummy_input['input_ids'], dummy_input['attention_mask']),
onnx_path,
export_params=True,
opset_version=12,
do_constant_folding=True,
input_names=['input_ids', 'attention_mask'],
output_names=['logits'],
dynamic_axes={
'input_ids': {0: 'batch_size', 1: 'sequence_length'},
'attention_mask': {0: 'batch_size', 1: 'sequence_length'},
'logits': {0: 'batch_size', 1: 'sequence_length'}
}
)
print(f"Model exported to {onnx_path}")
# 验证ONNX模型
onnx_model = onnx.load(onnx_path)
onnx.checker.check_model(onnx_model)
print("ONNX model is valid")
return onnx_path
def create_ort_session(onnx_path):
# 创建ONNX Runtime会话
session = ort.InferenceSession(
onnx_path,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
return session
def predict_with_onnx(session, tokenizer, text, max_length=128):
# 使用ONNX Runtime进行预测
# 分词
inputs = tokenizer(
text,
return_tensors='np',
padding='max_length',
truncation=True,
max_length=max_length
)
# 准备输入
ort_inputs = {
'input_ids': inputs['input_ids'],
'attention_mask': inputs['attention_mask']
}
# 推理
logits = session.run(['logits'], ort_inputs)[0]
# 获取预测结果
predictions = np.argmax(logits, axis=2)
return predictions
def optimize_inference_speed(model, tokenizer, texts, max_length=128):
# 优化推理速度的综合方案
# 1. 批处理
batch_size = 32
results = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
# 批量分词
inputs = tokenizer(
batch_texts,
return_tensors='pt',
padding=True,
truncation=True,
max_length=max_length
)
# 批量预测
model.eval()
with torch.no_grad():
outputs = model(**inputs)
predictions = torch.argmax(outputs.logits, dim=2)
results.extend(predictions.tolist())
return results序列标注作为NLP的基础任务,其发展经历了从传统统计方法到深度学习,再到预训练语言模型的演进过程。不同的模型各有优势:
序列标注技术的未来发展将呈现以下趋势:
对于序列标注领域的研究,以下方向值得关注:
在实际应用序列标注技术时,建议考虑以下几点:
通过本详细讲解,我们全面介绍了序列标注技术的发展历程、核心算法、实现方法和最新进展。希望能为读者在实际应用中提供有价值的参考和指导。随着NLP技术的不断发展,序列标注作为基础任务,将继续发挥重要作用,并在更多领域得到应用和创新。
序列标注技术演进路线图
传统方法 → 深度学习方法 → 预训练模型 → 大语言模型
↓ ↓ ↓ ↓
HMM BiLSTM BERT系列 GPT系列
↓ ↓ ↓ ↓
CRF BiLSTM-CRF 多语言模型 零样本标注