import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
print("环境设置完成,开始模型可解释性探索...")应用领域 | 可解释性需求 | 风险案例 | 解决方案 |
|---|---|---|---|
金融信贷 | 监管合规、客户沟通 | 贷款拒绝需要合理解释 | SHAP值、特征重要性 |
医疗诊断 | 医生信任、患者安全 | 误诊导致医疗事故 | LIME局部解释、注意力机制 |
司法决策 | 公平性、透明度 | 算法偏见引发社会争议 | 公平性指标、反事实解释 |
自动驾驶 | 事故责任认定 | 事故原因难以追溯 | 决策路径可视化 |
招聘系统 | 避免歧视、合规性 | 性别种族歧视诉讼 | 公平性审计、偏见检测 |

线性模型虽然简单,但在可解释性方面具有天然优势,是建立模型信任的良好起点。
class InterpretableBaseModels:
"""可解释基础模型类"""
def __init__(self):
self.models = {}
self.explanations = {}
def create_credit_risk_dataset(self, n_samples=10000):
"""创建信用风险评估数据集"""
np.random.seed(42)
# 生成相对真实的信用数据
data = {
'age': np.random.normal(45, 15, n_samples),
'income': np.random.lognormal(11, 0.5, n_samples), # 年收入
'credit_score': np.random.normal(700, 100, n_samples),
'debt_to_income': np.random.beta(2, 5, n_samples) * 100, # 债务收入比
'years_employed': np.random.exponential(10, n_samples),
'loan_amount': np.random.gamma(2, 10000, n_samples),
'savings_balance': np.random.exponential(50000, n_samples),
'existing_loans': np.random.poisson(2, n_samples),
'education_level': np.random.choice(['高中', '本科', '硕士', '博士'],
n_samples, p=[0.3, 0.4, 0.2, 0.1]),
'home_ownership': np.random.choice(['租房', '有房贷', '全款房'],
n_samples, p=[0.4, 0.4, 0.2])
}
df = pd.DataFrame(data)
# 基于业务逻辑生成违约标签
# 违约概率与多个因素相关
default_prob = (
0.1 +
0.3 * (df['debt_to_income'] > 40) +
0.2 * (df['credit_score'] < 600) +
0.15 * (df['loan_amount'] > df['income'] * 0.5) +
0.1 * (df['years_employed'] < 2) -
0.1 * (df['education_level'].isin(['硕士', '博士'])) -
0.05 * (df['home_ownership'] == '全款房') +
np.random.normal(0, 0.1, n_samples)
)
df['default'] = (default_prob > 0.5).astype(int)
return df
def prepare_features(self, df):
"""特征预处理"""
df_processed = df.copy()
# 数值特征标准化
numerical_features = ['age', 'income', 'credit_score', 'debt_to_income',
'years_employed', 'loan_amount', 'savings_balance', 'existing_loans']
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
df_processed[numerical_features] = scaler.fit_transform(df_processed[numerical_features])
# 类别特征编码
df_processed = pd.get_dummies(df_processed, columns=['education_level', 'home_ownership'])
return df_processed, numerical_features
def train_interpretable_models(self, X, y):
"""训练可解释模型"""
# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42, stratify=y
)
# 1. 逻辑回归(高可解释性)
lr_model = LogisticRegression(max_iter=1000, random_state=42)
lr_model.fit(X_train, y_train)
# 2. 决策树(中等可解释性)
from sklearn.tree import DecisionTreeClassifier, export_text
dt_model = DecisionTreeClassifier(max_depth=5, random_state=42)
dt_model.fit(X_train, y_train)
# 3. 线性判别分析
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
lda_model = LinearDiscriminantAnalysis()
lda_model.fit(X_train, y_train)
self.models = {
'Logistic Regression': lr_model,
'Decision Tree': dt_model,
'LDA': lda_model
}
# 模型性能评估
performance = {}
for name, model in self.models.items():
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
performance[name] = accuracy
return X_train, X_test, y_train, y_test, performance
def explain_linear_model(self, model, feature_names, X_train):
"""解释线性模型"""
explanations = {}
# 系数分析
if hasattr(model, 'coef_'):
coefficients = model.coef_[0]
feature_importance = pd.DataFrame({
'feature': feature_names,
'coefficient': coefficients,
'abs_coefficient': abs(coefficients)
}).sort_values('abs_coefficient', ascending=False)
explanations['coefficients'] = feature_importance
# 特征重要性可视化
plt.figure(figsize=(10, 6))
top_features = feature_importance.head(10)
colors = ['red' if x < 0 else 'green' for x in top_features['coefficient']]
plt.barh(top_features['feature'], top_features['coefficient'], color=colors)
plt.xlabel('系数大小')
plt.title('逻辑回归特征系数(红色负向,绿色正向)')
plt.tight_layout()
plt.show()
# 概率解释
if hasattr(model, 'predict_proba'):
# 显示基准概率
baseline_prob = model.predict_proba(X_train[:1])[0][1]
explanations['baseline_probability'] = baseline_prob
print(f"基准违约概率: {baseline_prob:.3f}")
return explanations
def explain_decision_tree(self, model, feature_names):
"""解释决策树模型"""
from sklearn.tree import plot_tree
explanations = {}
# 树结构可视化
plt.figure(figsize=(20, 10))
plot_tree(model,
feature_names=feature_names,
class_names=['正常', '违约'],
filled=True,
rounded=True,
fontsize=10)
plt.title('决策树结构可视化')
plt.show()
# 特征重要性
if hasattr(model, 'feature_importances_'):
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
explanations['feature_importance'] = importance_df
# 可视化特征重要性
plt.figure(figsize=(10, 6))
plt.barh(importance_df['feature'][:10], importance_df['importance'][:10])
plt.xlabel('特征重要性')
plt.title('决策树特征重要性排名')
plt.tight_layout()
plt.show()
# 提取决策规则
from sklearn.tree import export_text
tree_rules = export_text(model, feature_names=feature_names)
explanations['decision_rules'] = tree_rules
print("决策树规则示例:")
print(tree_rules[:1000]) # 显示前1000字符
return explanations
# 使用可解释基础模型
interpreter = InterpretableBaseModels()
credit_data = interpreter.create_credit_risk_dataset(5000)
processed_data, num_features = interpreter.prepare_features(credit_data)
# 准备特征和目标
X = processed_data.drop('default', axis=1)
y = processed_data['default']
feature_names = X.columns.tolist()
# 训练模型
X_train, X_test, y_train, y_test, performance = interpreter.train_interpretable_models(X, y)
print("模型性能比较:")
for model, acc in performance.items():
print(f"{model}: 准确率 = {acc:.3f}")
# 解释逻辑回归模型
lr_explanations = interpreter.explain_linear_model(
interpreter.models['Logistic Regression'], feature_names, X_train
)
print("\n逻辑回归关键发现:")
print("正向影响特征(增加违约概率):")
positive_effects = lr_explanations['coefficients'][lr_explanations['coefficients']['coefficient'] > 0]
print(positive_effects[['feature', 'coefficient']].head())
print("\n负向影响特征(降低违约概率):")
negative_effects = lr_explanations['coefficients'][lr_explanations['coefficients']['coefficient'] < 0]
print(negative_effects[['feature', 'coefficient']].head())
# 解释决策树模型
dt_explanations = interpreter.explain_decision_tree(
interpreter.models['Decision Tree'], feature_names
)决策树和规则-based模型提供直观的"如果-那么"规则,容易被业务人员理解。

SHAP(SHapley Additive exPlanations)基于博弈论,为每个特征分配预测贡献值,是目前最强大的模型解释工具。
class SHAPExplainer:
"""SHAP值解释器"""
def __init__(self):
self.shap_values = {}
self.explainers = {}
def comprehensive_shap_analysis(self, model, X, y, model_name='model'):
"""全面的SHAP分析"""
try:
import shap
except ImportError:
print("请先安装SHAP库: pip install shap")
return None
# 创建解释器
if hasattr(model, 'predict_proba'):
explainer = shap.TreeExplainer(model) if hasattr(model, 'estimators_') else shap.Explainer(model, X)
shap_values = explainer(X)
else:
explainer = shap.Explainer(model, X)
shap_values = explainer(X)
self.explainers[model_name] = explainer
self.shap_values[model_name] = shap_values
# 创建综合可视化
self._create_shap_visualizations(shap_values, X, y, model_name)
return explainer, shap_values
def _create_shap_visualizations(self, shap_values, X, y, model_name):
"""创建SHAP可视化"""
import shap
print(f"\n=== {model_name} SHAP分析结果 ===\n")
# 1. 特征重要性总结图
plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values, X, show=False)
plt.title(f'{model_name} - SHAP特征重要性总结')
plt.tight_layout()
plt.show()
# 2. 特征重要性条形图
plt.figure(figsize=(10, 6))
shap.summary_plot(shap_values, X, plot_type="bar", show=False)
plt.title(f'{model_name} - 平均SHAP绝对值重要性')
plt.tight_layout()
plt.show()
# 3. 依赖图(对最重要特征)
feature_names = X.columns
if hasattr(shap_values, 'values'):
mean_shap = np.abs(shap_values.values).mean(0)
else:
mean_shap = np.abs(shap_values).mean(0)
top_feature_idx = np.argmax(mean_shap)
top_feature = feature_names[top_feature_idx]
plt.figure(figsize=(12, 6))
shap.dependence_plot(top_feature_idx, shap_values.values if hasattr(shap_values, 'values') else shap_values,
X, display_features=X, show=False)
plt.title(f'{model_name} - {top_feature}的SHAP依赖图')
plt.tight_layout()
plt.show()
# 4. 个体预测解释
self._explain_individual_predictions(shap_values, X, y, model_name)
def _explain_individual_predictions(self, shap_values, X, y, model_name):
"""解释个体预测"""
import shap
# 选择几个有代表性的样本进行解释
n_samples = min(3, len(X))
# 找到预测概率接近0.5的边界案例
if hasattr(shap_values, 'values'):
predicted_proba = shap_values.values[:, 1] if shap_values.values.shape[1] > 1 else shap_values.values[:, 0]
else:
predicted_proba = shap_values[:, 1] if shap_values.shape[1] > 1 else shap_values[:, 0]
borderline_indices = np.argsort(np.abs(predicted_proba - 0.5))[:n_samples]
for i, idx in enumerate(borderline_indices):
print(f"\n--- 样本 {i+1} 个体解释 (索引: {idx}) ---")
print(f"真实标签: {y.iloc[idx]}, 预测概率: {predicted_proba[idx]:.3f}")
# 创建力力图
plt.figure(figsize=(12, 4))
shap.waterfall_plot(shap_values[idx], show=False)
plt.title(f'{model_name} - 样本 {idx} 预测解释')
plt.tight_layout()
plt.show()
# 显示特征值
sample_features = X.iloc[idx]
print("特征值:")
for feature, value in sample_features.items():
print(f" {feature}: {value:.3f}")
def compare_models_shap(self, models_dict, X, y):
"""比较多个模型的SHAP解释"""
import shap
comparison_results = {}
for name, model in models_dict.items():
print(f"\n正在分析 {name}...")
explainer, shap_vals = self.comprehensive_shap_analysis(model, X, y, name)
comparison_results[name] = {
'explainer': explainer,
'shap_values': shap_vals
}
# 比较特征重要性
self._compare_feature_importance(comparison_results, X.columns)
return comparison_results
def _compare_feature_importance(self, comparison_results, feature_names):
"""比较不同模型的特征重要性"""
importance_comparison = {}
for model_name, results in comparison_results.items():
shap_vals = results['shap_values']
if hasattr(shap_vals, 'values'):
mean_abs_shap = np.abs(shap_vals.values).mean(0)
else:
mean_abs_shap = np.abs(shap_vals).mean(0)
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': mean_abs_shap
}).sort_values('importance', ascending=False)
importance_comparison[model_name] = importance_df
# 创建比较可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
models = list(importance_comparison.keys())
for i, model in enumerate(models):
ax = axes[i//2, i%2]
top_features = importance_comparison[model].head(10)
ax.barh(top_features['feature'], top_features['importance'])
ax.set_title(f'{model} - Top 10特征重要性')
ax.set_xlabel('平均|SHAP值|')
plt.tight_layout()
plt.show()
# 输出一致性分析
self._analyze_importance_consistency(importance_comparison)
def _analyze_importance_consistency(self, importance_comparison):
"""分析特征重要性的一致性"""
# 提取每个模型的前10重要特征
top_features_by_model = {}
for model, importance_df in importance_comparison.items():
top_features = importance_df.head(10)['feature'].tolist()
top_features_by_model[model] = set(top_features)
# 计算模型间的一致性
models = list(top_features_by_model.keys())
consistency_matrix = pd.DataFrame(index=models, columns=models)
for i, model1 in enumerate(models):
for j, model2 in enumerate(models):
if i == j:
consistency_matrix.loc[model1, model2] = 1.0
else:
set1 = top_features_by_model[model1]
set2 = top_features_by_model[model2]
overlap = len(set1.intersection(set2))
consistency = overlap / min(len(set1), len(set2))
consistency_matrix.loc[model1, model2] = consistency
print("\n模型间特征重要性一致性矩阵:")
print(consistency_matrix)
# 训练一个黑盒模型进行比较
from sklearn.ensemble import RandomForestClassifier
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)
# 添加到模型字典
all_models = {
'Logistic Regression': interpreter.models['Logistic Regression'],
'Decision Tree': interpreter.models['Decision Tree'],
'Random Forest': rf_model
}
# 进行SHAP分析
shap_explainer = SHAPExplainer()
shap_comparison = shap_explainer.compare_models_shap(all_models, X_test, y_test)LIME(Local Interpretable Model-agnostic Explanations)通过构建局部代理模型来解释个体预测。
class LIMEExplainer:
"""LIME解释器"""
def __init__(self):
self.explanations = {}
def lime_local_explanations(self, model, X, y, feature_names, class_names=None):
"""LIME局部解释"""
try:
import lime
import lime.lime_tabular
except ImportError:
print("请先安装LIME库: pip install lime")
return None
if class_names is None:
class_names = ['正常', '违约']
# 创建LIME解释器
explainer = lime.lime_tabular.LimeTabularExplainer(
X.values,
feature_names=feature_names,
class_names=class_names,
mode='classification',
random_state=42
)
# 选择几个有代表性的样本
representative_indices = self._select_representative_samples(model, X, y)
explanations = {}
for idx in representative_indices:
print(f"\n=== 样本 {idx} 的LIME解释 ===")
explanation = self._explain_single_instance(
explainer, model, X.iloc[idx], y.iloc[idx], idx
)
explanations[idx] = explanation
self.explanations = explanations
return explainer, explanations
def _select_representative_samples(self, model, X, y):
"""选择有代表性的样本"""
# 获取预测概率
if hasattr(model, 'predict_proba'):
y_pred_proba = model.predict_proba(X)[:, 1]
else:
y_pred_proba = model.predict(X)
# 选择不同类型的样本
indices = []
# 1. 高置信度正确预测
correct_predictions = (y_pred_proba > 0.7) & (y == 1) | (y_pred_proba < 0.3) & (y == 0)
if correct_predictions.any():
indices.append(correct_predictions.idxmax())
# 2. 边界案例(预测概率接近0.5)
borderline = np.abs(y_pred_proba - 0.5) < 0.1
if borderline.any():
indices.append(borderline.idxmax())
# 3. 错误预测
y_pred = (y_pred_proba > 0.5).astype(int)
wrong_predictions = y_pred != y
if wrong_predictions.any():
indices.append(wrong_predictions.idxmax())
return indices[:3] # 最多返回3个样本
def _explain_single_instance(self, explainer, model, instance, true_label, idx):
"""解释单个实例"""
import lime
# 生成解释
explanation = explainer.explain_instance(
instance.values,
model.predict_proba if hasattr(model, 'predict_proba') else lambda x: np.column_stack([1-model.predict(x), model.predict(x)]),
num_features=10
)
# 显示解释结果
print(f"真实标签: {true_label}")
# 显示预测概率
if hasattr(model, 'predict_proba'):
proba = model.predict_proba(instance.values.reshape(1, -1))[0]
print(f"预测概率: 正常={proba[0]:.3f}, 违约={proba[1]:.3f}")
# 显示特征贡献
print("\n特征贡献(正向增加违约概率,负向降低违约概率):")
for feature, weight in explanation.as_list():
print(f" {feature}: {weight:.4f}")
# 可视化解释
fig = explanation.as_pyplot_figure()
plt.title(f'样本 {idx} 的LIME解释')
plt.tight_layout()
plt.show()
return explanation
# 使用LIME解释器
lime_explainer = LIMEExplainer()
lime_results = lime_explainer.lime_local_explanations(
rf_model, X_test, y_test, feature_names
)部分依赖图显示特征对预测结果的平均边际效应,是理解特征影响的重要工具。
技术名称 | 解释层次 | 适用场景 | 优势 | 限制 |
|---|---|---|---|---|
部分依赖图PDP | 全局解释 | 理解特征平均影响 | 直观显示趋势 | 假设特征独立 |
个体条件期望ICE | 局部解释 | 分析个体差异 | 显示异质性 | 可能过于详细 |
累积局部效应ALE | 全局解释 | 处理相关特征 | 解决相关性问题 | 计算复杂度高 |
特征交互作用 | 全局解释 | 发现交互效应 | 揭示复杂关系 | 维度灾难风险 |
反事实解释 | 局部解释 | 提供改进建议 | actionable见解 | 生成质量依赖方法 |
class AdvancedModelExplainer:
"""高级模型解释器"""
def __init__(self):
self.analysis_results = {}
def partial_dependence_analysis(self, model, X, y, feature_names, top_features=5):
"""部分依赖分析"""
from sklearn.inspection import PartialDependenceDisplay
# 选择最重要的特征
if hasattr(model, 'feature_importances_'):
importance_scores = model.feature_importances_
else:
# 使用 permutation importance
from sklearn.inspection import permutation_importance
perm_importance = permutation_importance(model, X, y, n_repeats=10, random_state=42)
importance_scores = perm_importance.importances_mean
top_indices = np.argsort(importance_scores)[-top_features:][::-1]
top_feature_names = [feature_names[i] for i in top_indices]
print(f"分析前 {top_features} 个最重要特征的部分依赖:")
print(top_feature_names)
# 创建PDP图
fig, ax = plt.subplots(figsize=(15, 10))
PartialDependenceDisplay.from_estimator(
model, X, features=top_indices,
feature_names=feature_names,
ax=ax
)
plt.suptitle('部分依赖图 (PDP) - 显示特征对预测的平均影响')
plt.tight_layout()
plt.show()
# 创建ICE图
self._individual_conditional_expectation(model, X, top_indices, feature_names)
return top_feature_names
def _individual_conditional_expectation(self, model, X, feature_indices, feature_names):
"""个体条件期望图"""
from sklearn.inspection import PartialDependenceDisplay
fig, axes = plt.subplots(2, min(3, len(feature_indices)), figsize=(15, 8))
axes = axes.flatten()
for i, feature_idx in enumerate(feature_indices[:len(axes)]):
PartialDependenceDisplay.from_estimator(
model, X, features=[feature_idx],
feature_names=feature_names,
kind='individual',
ax=axes[i]
)
axes[i].set_title(f'{feature_names[feature_idx]} - ICE图')
plt.suptitle('个体条件期望 (ICE) - 显示个体预测的变化')
plt.tight_layout()
plt.show()
def feature_interaction_analysis(self, model, X, feature_names):
"""特征交互作用分析"""
try:
from sklearn.inspection import PartialDependenceDisplay
# 选择两个最重要的特征分析交互
if hasattr(model, 'feature_importances_'):
importance_scores = model.feature_importances_
else:
importance_scores = np.abs(model.coef_[0]) if hasattr(model, 'coef_') else np.ones(X.shape[1])
top_indices = np.argsort(importance_scores)[-2:][::-1]
# 创建交互图
fig, ax = plt.subplots(figsize=(10, 8))
PartialDependenceDisplay.from_estimator(
model, X, features=[(top_indices[0], top_indices[1])],
feature_names=feature_names,
ax=ax
)
plt.title('特征交互图 - 显示两个特征的联合影响')
plt.tight_layout()
plt.show()
print(f"分析特征交互: {feature_names[top_indices[0]]} 和 {feature_names[top_indices[1]]}")
except Exception as e:
print(f"交互分析失败: {e}")
def counterfactual_explanations(self, model, X, y, feature_names, instance_idx=0):
"""反事实解释"""
print(f"\n=== 反事实解释分析 ===\n")
# 选择要解释的实例
instance = X.iloc[instance_idx:instance_idx+1]
true_label = y.iloc[instance_idx]
if hasattr(model, 'predict_proba'):
original_prediction = model.predict_proba(instance)[0][1]
else:
original_prediction = model.predict(instance)[0]
print(f"实例 {instance_idx} 的原始情况:")
print(f"真实标签: {true_label}, 预测违约概率: {original_prediction:.3f}")
# 生成反事实解释
counterfactuals = self._generate_counterfactuals(model, instance, feature_names, target_prob=0.3)
return counterfactuals
def _generate_counterfactuals(self, model, instance, feature_names, target_prob=0.3):
"""生成反事实解释"""
import copy
instance_copy = copy.deepcopy(instance.values[0])
original_features = instance_copy.copy()
# 简单的反事实生成策略
feature_changes = []
current_prob = model.predict_proba(instance)[0][1]
print(f"\n当前违约概率: {current_prob:.3f}")
print(f"目标违约概率: {target_prob:.3f}")
print("\n如何降低违约概率:")
# 尝试调整每个特征
for i, (feature, original_value) in enumerate(zip(feature_names, original_features)):
if current_prob <= target_prob:
break
# 创建修改后的实例
modified_instance = original_features.copy()
# 根据特征类型进行合理修改
if 'debt' in feature.lower() or 'loan' in feature.lower():
# 债务相关特征:降低值
modified_value = original_value * 0.7 # 降低30%
elif 'income' in feature.lower() or 'savings' in feature.lower() or 'credit' in feature.lower():
# 收入、储蓄、信用相关:增加值
modified_value = original_value * 1.3 # 增加30%
else:
# 其他特征:轻微调整
modified_value = original_value * 1.1
modified_instance[i] = modified_value
new_prob = model.predict_proba(modified_instance.reshape(1, -1))[0][1]
if new_prob < current_prob: # 如果修改降低了违约概率
change_percent = (modified_value - original_value) / original_value * 100
prob_change = (new_prob - current_prob) / current_prob * 100
print(f" • {feature}: {original_value:.2f} → {modified_value:.2f} ({change_percent:+.1f}%)")
print(f" 违约概率: {current_prob:.3f} → {new_prob:.3f} ({prob_change:+.1f}%)")
feature_changes.append({
'feature': feature,
'original': original_value,
'modified': modified_value,
'change_percent': change_percent,
'old_prob': current_prob,
'new_prob': new_prob,
'prob_change': prob_change
})
# 更新当前概率
current_prob = new_prob
return feature_changes
# 使用高级解释技术
advanced_explainer = AdvancedModelExplainer()
# 部分依赖分析
important_features = advanced_explainer.partial_dependence_analysis(
rf_model, X_test, y_test, feature_names
)
# 特征交互分析
advanced_explainer.feature_interaction_analysis(rf_model, X_test, feature_names)
# 反事实解释
counterfactuals = advanced_explainer.counterfactual_explanations(
rf_model, X_test, y_test, feature_names
)
在金融风控中,模型可解释性不仅是技术需求,更是监管和业务的硬性要求。
class BusinessInterpreter:
"""业务导向的模型解释器"""
def __init__(self):
self.business_rules = {}
self.compliance_checks = {}
def create_business_friendly_explanations(self, model, X, y, feature_names,
customer_data=None, business_context=None):
"""创建业务友好的解释"""
explanations = {}
# 1. 监管合规检查
compliance_report = self._regulatory_compliance_check(model, X, feature_names)
explanations['compliance'] = compliance_report
# 2. 业务规则对齐
alignment_analysis = self._business_rule_alignment(model, X, feature_names)
explanations['business_alignment'] = alignment_analysis
# 3. 客户沟通模板
communication_templates = self._create_communication_templates(model, X, y, feature_names)
explanations['communication'] = communication_templates
# 4. 风险洞察报告
risk_insights = self._generate_risk_insights(model, X, y, feature_names)
explanations['risk_insights'] = risk_insights
return explanations
def _regulatory_compliance_check(self, model, X, feature_names):
"""监管合规检查"""
compliance_issues = []
# 检查是否使用敏感特征
sensitive_features = ['race', 'gender', 'age', 'religion', 'nationality']
used_sensitive_features = []
for feature in feature_names:
for sensitive in sensitive_features:
if sensitive in feature.lower():
used_sensitive_features.append(feature)
if used_sensitive_features:
compliance_issues.append({
'issue': '使用敏感特征',
'features': used_sensitive_features,
'severity': '高',
'recommendation': '移除或严格监控这些特征的使用'
})
# 检查特征重要性合理性
if hasattr(model, 'feature_importances_'):
importance_scores = model.feature_importances_
top_features = [feature_names[i] for i in np.argsort(importance_scores)[-5:][::-1]]
# 检查是否与业务常识一致
business_critical = ['credit_score', 'income', 'debt']
missing_critical = [feat for feat in business_critical
if not any(feat in top_feat for top_feat in top_features)]
if missing_critical:
compliance_issues.append({
'issue': '关键业务特征重要性不足',
'missing_features': missing_critical,
'severity': '中',
'recommendation': '检查模型是否捕获了关键业务逻辑'
})
return {
'issues': compliance_issues,
'status': '通过' if not compliance_issues else '需审查'
}
def _business_rule_alignment(self, model, X, feature_names):
"""业务规则对齐分析"""
alignment_results = {}
# 定义业务规则期望
business_expectations = {
'credit_score': {'direction': 'negative', 'strength': 'strong'},
'income': {'direction': 'negative', 'strength': 'strong'},
'debt_to_income': {'direction': 'positive', 'strength': 'strong'},
'years_employed': {'direction': 'negative', 'strength': 'medium'}
}
# 分析模型与业务规则的一致性
consistency_scores = {}
for feature, expectation in business_expectations.items():
# 找到对应的特征
matching_features = [f for f in feature_names if feature in f.lower()]
if matching_features:
actual_feature = matching_features[0]
feature_idx = feature_names.index(actual_feature)
# 分析特征影响方向
if hasattr(model, 'coef_'):
# 线性模型
coef = model.coef_[0][feature_idx]
actual_direction = 'negative' if coef < 0 else 'positive'
else:
# 使用SHAP分析方向
try:
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X)
mean_shap = np.mean(shap_values[1][:, feature_idx] if isinstance(shap_values, list) else shap_values[:, feature_idx])
actual_direction = 'negative' if mean_shap < 0 else 'positive'
except:
actual_direction = 'unknown'
# 计算一致性
expected_direction = expectation['direction']
consistent = (actual_direction == expected_direction)
consistency_scores[feature] = {
'expected': expected_direction,
'actual': actual_direction,
'consistent': consistent,
'feature_used': actual_feature
}
alignment_results['feature_consistency'] = consistency_scores
alignment_results['alignment_score'] = sum([1 for v in consistency_scores.values() if v['consistent']]) / len(consistency_scores)
return alignment_results
def _create_communication_templates(self, model, X, y, feature_names):
"""创建客户沟通模板"""
templates = {}
# 批准决策模板
approval_template = """
恭喜!您的贷款申请已获得批准。
决策主要基于以下积极因素:
{positive_factors}
建议继续保持良好的信用习惯。
"""
# 拒绝决策模板
rejection_template = """
感谢您的申请。经过综合评估,我们暂时无法批准本次贷款。
主要考虑因素包括:
{negative_factors}
建议改进方向:
{improvement_suggestions}
"""
templates['approval'] = approval_template
templates['rejection'] = rejection_template
# 生成具体案例
sample_explanations = self._generate_sample_explanations(model, X, y, feature_names)
templates['examples'] = sample_explanations
return templates
def _generate_sample_explanations(self, model, X, y, feature_names):
"""生成样本解释"""
sample_explanations = {}
# 选择几个代表性样本
indices = [0, 1, 2] # 简单选择前三个样本
for idx in indices:
explanation = self._explain_single_decision(model, X.iloc[idx:idx+1], y.iloc[idx], feature_names)
sample_explanations[f'sample_{idx}'] = explanation
return sample_explanations
def _explain_single_decision(self, model, instance, true_label, feature_names):
"""解释单个决策"""
explanation = {}
# 获取预测
if hasattr(model, 'predict_proba'):
proba = model.predict_proba(instance)[0]
prediction = 1 if proba[1] > 0.5 else 0
confidence = max(proba)
else:
prediction = model.predict(instance)[0]
confidence = 1.0
explanation['prediction'] = prediction
explanation['confidence'] = confidence
explanation['true_label'] = true_label
# 分析关键因素
key_factors = self._identify_key_factors(model, instance, feature_names)
explanation['key_factors'] = key_factors
# 生成自然语言解释
nl_explanation = self._generate_natural_language(key_factors, prediction, true_label)
explanation['natural_language'] = nl_explanation
return explanation
def _identify_key_factors(self, model, instance, feature_names):
"""识别关键影响因素"""
try:
import shap
# 使用SHAP分析
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(instance)
if isinstance(shap_values, list):
shap_values = shap_values[1] # 取正类SHAP值
# 获取特征贡献
contributions = {}
for i, feature in enumerate(feature_names):
contributions[feature] = shap_values[0, i]
# 排序并返回最重要的因素
sorted_contributions = sorted(contributions.items(), key=lambda x: abs(x[1]), reverse=True)
return sorted_contributions[:3] # 返回前3个关键因素
except:
# 回退方法:使用特征重要性
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
feature_importance = list(zip(feature_names, importances))
sorted_importance = sorted(feature_importance, key=lambda x: x[1], reverse=True)
return [(feat, imp * 0.1) for feat, imp in sorted_importance[:3]] # 简化处理
else:
return [('无法分析', 0)]
def _generate_natural_language(self, key_factors, prediction, true_label):
"""生成自然语言解释"""
if prediction == 1: # 预测违约
explanation = "本次申请风险评估较高,主要因为:\n"
for factor, impact in key_factors:
if impact > 0:
explanation += f"• {factor} 增加了违约风险\n"
else:
explanation += f"• {factor} 未能充分降低风险\n"
else: # 预测正常
explanation = "本次申请风险评估良好,主要因为:\n"
for factor, impact in key_factors:
if impact < 0:
explanation += f"• {factor} 降低了违约风险\n"
else:
explanation += f"• {factor} 表现正常\n"
return explanation
def _generate_risk_insights(self, model, X, y, feature_names):
"""生成风险洞察报告"""
insights = {}
# 特征分布分析
risk_by_feature = self._analyze_risk_by_feature(model, X, y, feature_names)
insights['feature_analysis'] = risk_by_feature
# 客户分群洞察
segmentation_insights = self._customer_segmentation_insights(model, X, y, feature_names)
insights['segmentation'] = segmentation_insights
# 趋势分析
trend_analysis = self._risk_trend_analysis(model, X, y, feature_names)
insights['trends'] = trend_analysis
return insights
def _analyze_risk_by_feature(self, model, X, y, feature_names):
"""按特征分析风险"""
risk_analysis = {}
# 选择几个重要特征进行分析
important_features = ['credit_score', 'debt_to_income', 'income']
for feature_pattern in important_features:
matching_features = [f for f in feature_names if feature_pattern in f.lower()]
if matching_features:
feature = matching_features[0]
feature_values = X[feature]
# 分析不同分位的风险率
quantiles = [0.25, 0.5, 0.75]
risk_by_quantile = {}
for q in quantiles:
threshold = feature_values.quantile(q)
if feature_pattern in ['credit_score']:
# 高信用分数低风险
high_risk_group = feature_values < threshold
else:
high_risk_group = feature_values > threshold
risk_rate = y[high_risk_group].mean() if high_risk_group.any() else 0
risk_by_quantile[f'Q{q}'] = risk_rate
risk_analysis[feature] = risk_by_quantile
return risk_analysis
# 使用业务解释器
business_interpreter = BusinessInterpreter()
business_explanations = business_interpreter.create_business_friendly_explanations(
rf_model, X_test, y_test, feature_names
)
print("=== 业务可解释性分析报告 ===\n")
# 显示合规检查结果
compliance = business_explanations['compliance']
print("1. 监管合规检查:")
print(f" 状态: {compliance['status']}")
if compliance['issues']:
for issue in compliance['issues']:
print(f" ⚠️ {issue['issue']}: {issue['features']}")
print(f" 建议: {issue['recommendation']}")
# 显示业务规则对齐
alignment = business_explanations['business_alignment']
print(f"\n2. 业务规则对齐度: {alignment['alignment_score']:.1%}")
for feature, consistency in alignment['feature_consistency'].items():
status = "✓" if consistency['consistent'] else "✗"
print(f" {status} {feature}: 期望{consistency['expected']}, 实际{consistency['actual']}")
# 显示样本解释
print("\n3. 样本决策解释:")
for sample_key, explanation in business_explanations['communication']['examples'].items():
print(f"\n{sample_key}:")
print(f" 预测: {'违约' if explanation['prediction'] == 1 else '正常'}")
print(f" 置信度: {explanation['confidence']:.1%}")
print(f" 解释: {explanation['natural_language']}")
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。