上一篇我们讲了MMoE多任务网络,通过对每一个任务塔建立Gate门控,对专家网络进行加权平均,Gate门控起到了对多个共享专家重要度筛选的作用。在每轮反向传播时,每个任务tower分别更新对应Gate的参数,以及共享专家的参数。模型主要起到了多目标任务平衡的作用。
今天我们重点将CGC(Customized Gate Control)定制门控网络,核心思想是在MMoE基础上,为每一个任务tower定制独享专家,实用任务独享专家与共享专家共同决定任务Tower的输入,相比于MMoE仅用Gate门控表征任务Tower的方法,CGC引入独享专家,对任务表征更加全面,又通过共享专家保证关联性。
CGC(Customized Gate Control)全称为定制门控网络,主要由多个任务塔、对应多组独享专家网络,对应多个门控网络以及一组共享专家网络,专家网络组内可以包含多个专家MLP。核心原理:样本input分别输入共享专家MLP、独立专家MLP、独立专家对应门控网络,门控网络输出为经过softmax的权重分布,维度对应共享专家数num_shared_experts和独立专家数num_task_experts的和,通过对独立专家输出和共享专家输出采用Gate门控加权平均后, 输入到对应的任务Tower。每个任务Tower输入自己对应的独享专家、共享专家、门控加权平均的输入。反向传播时,每个任务更新自己独享专家、独享门控以及共享专家的参数。
相较于MMoE网络,CGC为每一个任务tower定制独享专家,实用任务独享专家与共享专家共同决定任务Tower的输入,相比于MMoE仅用Gate门控表征任务Tower的方法,CGC引入独享专家,对任务表征更加全面,又通过共享专家保证关联性。
优点:
缺点:
我们还是以小红书推荐场景为例,针对一个视频,用户可以点红心(互动),也可以点击视频进行播放(点击),针对互动和点击两个目标进行多目标建模
我们构建一个100维特征输入,1组共享专家网络(含2个共享专家),2组独享专家网络(各含2个独享专家),2个门控,2个任务塔的CGC网络,用于建模多目标学习问题,模型架构图如下:
如架构图所示,其中有几个注意的点:
基于pytorch,实现上述CGC网络架构,如下:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
class CGCModel(nn.Module):
def __init__(self, input_dim, experts_hidden1_dim, experts_hidden2_dim, output_experts_dim, task_hidden1_dim, task_hidden2_dim, output_task1_dim, output_task2_dim, gate_hidden1_dim, gate_hidden2_dim, num_shared_experts, num_task_experts):
super(CGCModel, self).__init__()
# 初始化函数外使用初始化变量需要赋值,否则默认使用全局变量
# 初始化函数内使用初始化变量不需要赋值
self.num_shared_experts = num_shared_experts
self.num_task_experts = num_task_experts
self.output_experts_dim = output_experts_dim
# 初始化共享专家
self.shared_experts_2 = nn.ModuleList([
nn.Sequential(
nn.Linear(input_dim, experts_hidden1_dim),
nn.ReLU(),
nn.Linear(experts_hidden1_dim, experts_hidden2_dim),
nn.ReLU(),
nn.Linear(experts_hidden2_dim, output_experts_dim),
nn.ReLU()
) for _ in range(num_shared_experts)
])
# 初始化任务1专家
self.task1_experts_2 = nn.ModuleList([
nn.Sequential(
nn.Linear(input_dim, experts_hidden1_dim),
nn.ReLU(),
nn.Linear(experts_hidden1_dim, experts_hidden2_dim),
nn.ReLU(),
nn.Linear(experts_hidden2_dim, output_experts_dim),
nn.ReLU()
) for _ in range(num_task_experts)
])
# 初始化任务2专家
self.task2_experts_2 = nn.ModuleList([
nn.Sequential(
nn.Linear(input_dim, experts_hidden1_dim),
nn.ReLU(),
nn.Linear(experts_hidden1_dim, experts_hidden2_dim),
nn.ReLU(),
nn.Linear(experts_hidden2_dim, output_experts_dim),
nn.ReLU()
) for _ in range(num_task_experts)
])
# 初始化门控网络任务1
self.gating1_network_2 = nn.Sequential(
nn.Linear(input_dim, gate_hidden1_dim),
nn.ReLU(),
nn.Linear(gate_hidden1_dim, gate_hidden2_dim),
nn.ReLU(),
nn.Linear(gate_hidden2_dim, num_shared_experts+num_task_experts),
nn.Softmax(dim=1)
)
# 初始化门控网络任务2
self.gating2_network_2 = nn.Sequential(
nn.Linear(input_dim, gate_hidden1_dim),
nn.ReLU(),
nn.Linear(gate_hidden1_dim, gate_hidden2_dim),
nn.ReLU(),
nn.Linear(gate_hidden2_dim, num_shared_experts+num_task_experts),
nn.Softmax(dim=1)
)
# 定义任务1的输出层
self.task1_head = nn.Sequential(
nn.Linear(output_experts_dim, task_hidden1_dim),
nn.ReLU(),
nn.Linear(task_hidden1_dim, task_hidden2_dim),
nn.ReLU(),
nn.Linear(task_hidden2_dim, output_task1_dim),
nn.Sigmoid()
)
# 定义任务2的输出层
self.task2_head = nn.Sequential(
nn.Linear(output_experts_dim, task_hidden1_dim),
nn.ReLU(),
nn.Linear(task_hidden1_dim, task_hidden2_dim),
nn.ReLU(),
nn.Linear(task_hidden2_dim, output_task2_dim),
nn.Sigmoid()
)
def forward(self, x):
gates1 = self.gating1_network_2(x)
gates2 = self.gating2_network_2(x)
#定义专家网络输出作为任务塔输入
batch_size, _ = x.shape
task1_inputs = torch.zeros(batch_size, self.output_experts_dim)
task2_inputs = torch.zeros(batch_size, self.output_experts_dim)
for i in range(self.num_shared_experts):
task1_inputs += self.shared_experts_2[i](x) * gates1[:, i].unsqueeze(1) + self.task1_experts_2[i](x) * gates1[:, i+self.num_shared_experts].unsqueeze(1)
task2_inputs += self.shared_experts_2[i](x) * gates2[:, i].unsqueeze(1) + self.task2_experts_2[i](x) * gates2[:, i+self.num_shared_experts].unsqueeze(1)
task1_outputs = self.task1_head(task1_inputs)
task2_outputs = self.task2_head(task2_inputs)
return task1_outputs, task2_outputs
# 实例化模型对象
experts_hidden1_dim = 64
experts_hidden2_dim = 32
output_experts_dim = 16
gate_hidden1_dim = 16
gate_hidden2_dim = 8
task_hidden1_dim = 32
task_hidden2_dim = 16
output_task1_dim = 1
output_task2_dim = 1
num_shared_experts = 2
num_task_experts = 2
# 构造虚拟样本数据
torch.manual_seed(42) # 设置随机种子以保证结果可重复
input_dim = 100
num_samples = 1024
X_train = torch.randint(0, 2, (num_samples, input_dim)).float()
y_train_task1 = torch.rand(num_samples, output_task1_dim) # 假设任务1的输出维度为1
y_train_task2 = torch.rand(num_samples, output_task2_dim) # 假设任务2的输出维度为1
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train_task1, y_train_task2)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
model = CGCModel(input_dim, experts_hidden1_dim, experts_hidden2_dim, output_experts_dim, task_hidden1_dim, task_hidden2_dim, output_task1_dim, output_task2_dim, gate_hidden1_dim, gate_hidden2_dim, num_shared_experts, num_task_experts)
# 定义损失函数和优化器
criterion_task1 = nn.MSELoss()
criterion_task2 = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练循环
num_epochs = 100
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for batch_idx, (X_batch, y_task1_batch, y_task2_batch) in enumerate(train_loader):
# 前向传播: 获取预测值
#print(batch_idx, X_batch )
#print(f'Epoch [{epoch+1}/{num_epochs}-{batch_idx}], Loss: {running_loss/len(train_loader):.4f}')
outputs_task1, outputs_task2 = model(X_batch)
# 计算每个任务的损失
loss_task1 = criterion_task1(outputs_task1, y_task1_batch)
loss_task2 = criterion_task2(outputs_task2, y_task2_batch)
total_loss = loss_task1 + loss_task2
# 反向传播和优化
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
running_loss += total_loss.item()
if epoch % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')
print(model)
#for param_tensor in model.state_dict():
# print(param_tensor, "\t", model.state_dict()[param_tensor].size())
# 模型预测
model.eval()
with torch.no_grad():
test_input = torch.randint(0, 2, (1, input_dim)).float() # 构造一个测试样本
pred_task1, pred_task2 = model(test_input)
print(f'互动目标预测结果: {pred_task1}')
print(f'点击目标预测结果: {pred_task2}')
相比于上一篇MMoE中的代码,CGC复杂了很多,新增了2组独享专家,且在门控与独享、共享专家加权平均计算的时候需要进行处理,很容易出问题。
运行上述代码,模型启动训练,Loss逐渐收敛,测试结果如下:
本文详细介绍了CGC多任务模型的算法原理、算法优势,他是下一篇PLE多层多任务模型的基础,并以小红书业务场景为例,构建CGC网络结构并使用pytorch代码实现对应的网络结构、训练流程。相比于MMoE,CGC新增独享专家网络,通过gate门控的串联,切断任务Tower与其他任务独享专家的联系,使得独享专家能够更专注的学习本任务内的知识与信息。