大油水藏在公式里。
本文能给你省10万块
TensorFlow是热门的深度学习框架,大量机构用它来训练各种模型。由于训练数据太多,1台服务器往往要算几小时甚至几周后才能看到模型的效果。为节省时间,必须使用多台服务器协作完成同一个训练任务。
本文介绍业界常用的2种通信模式,并提出一个更快的模式。
主从模式
TensorFlow支持分布式训练,采用主从模式(图1),劳工分别使用不同的样本进行训练,得到参数w(权重)和b(偏置);所有劳工每次使用一批(batch)数据训练、得到w的梯度Δw和b的梯度Δb;所有劳工将自己算得的梯度传给参数服务器,参数服务器计算所有权重梯度、偏置梯度的平均值,用平均值来用更新w和b,然后将最新的w和b传回各个劳工;劳工进行下一步训练。
图1
这样的主从模式下,参数服务器网络带宽是瓶颈。假设采用1个参数服务器和10个劳工,服务器之间用万兆以太网(带宽10Gb/s)连接,需要传输的参数大小是1GB,所有劳工将参数传给参数服务器需要花10*1/(10/8)=8秒,参数服务器将参数传回劳工还要再花8秒,共16秒。如果采用1个参数服务器和100劳工,那么参数传输共要花160秒。扩展性太差。
为了缓解参数服务器的网络瓶颈,TensorFlow支持多个参数服务器,见图2,每个参数服务器负责一部分参数的更新。以图2为例,假设有2个参数服务器和10个劳工,服务器之间用万兆以太网连接,需要传输的参数大小是1GB,所有劳工将参数分为两半,分别传给2台参数服务器需要花10*0.5/(10/8)=4秒,参数服务器将参数传给劳工还要再花4秒,共8秒。比1个参数服务器集群节省了一半的通信时间,代价就是增加了一个参数服务器。容易验证,用M个参数服务器能将通信时间减少至1/M,但实际上通信时间不会随着M的增加而一直减少。
图2
环形归约模式
多个参数服务器虽然能在一定程度上缓解通信瓶颈,但是无法从根本上消除瓶颈。因此,百度率先开发了一种称环形归约的通信模式,通过均衡网络负载来减少通信时间。
假设有5个主机,每个主机上有1个GPU,网络相互连通,逻辑上可以看成环状,见图3。
图3
假设5个GPU上都已经分别计算出了自己的权重梯度Δw、Δw1…Δw4偏置梯度Δb、Δb1…Δb4,下一步是5个GPU之间要交换数据,使用每个GPU上都有值Δw+Δw1+…+Δw4和Δb+Δb1+…+Δb4。这个交换过程可用如下方式实现。
每个GPU都将自己的待交换数据分为5段,如图4,这里的段数5与参与通信的GPU数量相同。
图4
接下来进行第一圈轮转相加,每次1/5的数
第一圈之后状态如图5,每个GPU都有一小块满足要求的数据。
图5
第二圈是轮转替换,也需要5步。
图6
第二圈完成之后的状态如图6,每个GPU都得到全部数据的和,达到目标。
现在考查环形归约的通信时间。假设有10个服务器(对应图中的GPU)要相互交换数据,服务器之间网络为万兆以太网(带宽10Gb/s),需要传输的参数大小是1GB,第一圈轮转相加时,每步传递数据1/10=0.1GB,花费时间0.1/(10/8)=0.08秒,10步共花费0.8秒。第二圈通信也花费0.8秒,共1.6秒。
假设有100个服务器参与通信,容易知道共花费1.6秒。实际上,无论有多少个服务器参与通信,都是花费1.6秒。比主从模式好太多。
百度开发了环形归约的代码。英伟达对环形归约做了深度优化,做成一个闭源软件包NCCL2.0,具体优化手段保密十分严格,据说买它1亿元GPU产品的公司都打听不出来。本人机缘巧合得知了这个秘密,只是不能公开说。
Uber开发了一个小软件包horovod[2],把NCCL2.0和TensorFlow对接了起来,摆脱了TensorFlow的Grpc通信组件,改用MPI通信,性能大幅提高。
交换权重
经过前面的准备,迎来大招。
主从模式和环形归约模式中,主机之间交换都是梯度数据。而梯度取决于当前样本、当前权重偏置,每计算完一批样本之后必须立即交换梯度,否则就导致收敛曲线振荡甚至不收敛。
如果想进一步减少通信时间怎么呢?有人想出了办法:通信时将梯度值由32位浮点精度降低为16位浮点精度,或者将梯度矩阵分解为两个向量(两个较小矩阵)的乘积。总之就是损失精度换取速度。
其实,仔细观察深度学习的通用公式就能发现减少通信时间的好方法:不通信。
从梯度更新公式可知,每一批样本数据都会对参数w和b做一些改动,这个改动量就是梯度Δw和Δb。通常情况下,训练样本都已经被混洗均匀,每批样本都与整体分布大致相符。如果使用Q批样本连续更新参数w和b,那么这些样本对参数的Q次改动已经积累在w和b中了。因此,不必训练完每批样本后立即交换梯度,而是在训练完Q批样本后再交换参数w和b本身。这里的Q可以随意指定,这也意味着可以任意比例减少通信时间,具体操作示意图见图7.
图7
代码实现。虽然有了思路,但在TensorFlow里实现不易,这是因为TensorFlow里都是更新梯度,可能没有想过为交换参数本身提供工具。在踩过多个坑之后发现,还是在TensorFlow之外通信方便。
用3层神经网络进行二分类,将土星环状样本点(图8)分开;分类结果见图9,绿色多边形将两类样本点完分开。
图8
图9
简要分析代码。
第3行引入软件包mpi4py。第4-9行定义几个全局变量,mpicomm是MPI通信域,包含所有的MPI进程;mpirank是当前进程的编号,从0开始;mpisize是mpicomm中进程的数量;mpilocalrank是当前过程在本地服务器上的编号,例如在2台服务器上共启动了4个进程,每台服务器上有2个进程,那么mpilocalrank的取值为0或1。Mpilocalrank用于将进程与服务器上的多个GPU分别绑定,本例中没有用到。
第11-20行定义的函数BcastGlobalVariables用于广播全局变量。在tensorflow初始化完成之后,将0号进程的全局变量值广播给所有进程,使所有进程的初始值都相同。
第22-29行定义的函数AvgTrainVars用于在所有进程上归约得到所有的训练变量值,要求事先获取训练变量。
第128行获取训练变量的列表trainvars,被优化器更新所有可训练变量都在里面。第139行在tensorflow初始化全局变量之后立即广播。
第144行用mpirank参数计算样本的偏移量,使得每个进程都在不同的样本上训练。第148-149行,每隔20步训练才进行一次归约通信,减少通信量。间隔步数20可以随意指定,从而可以任意比例压缩通信时间。
对比输出的w2、b2、w3、b3的值,两个进程相差无几,算法正确。
1importnumpyasnp
2importtensorflowastf
3frommpi4pyimportMPI
4
5mpicomm = MPI.COMM_WORLD
6mpirank =mpicomm.Get_rank()
7mpisize =mpicomm.Get_size()
8mpilcomm =mpicomm.Split_type(MPI.COMM_TYPE_SHARED)
9mpilocalrank =mpilcomm.Get_rank()
10
11 defBcastGlobalVariables(comm, rank,sess):
12gvars = tf.global_variables()
13vlen =len(gvars)
14foridxinxrange(vlen):
15var =np.zeros(gvars[idx].shape,dtype=gvars[idx].dtype.as_numpy_dtype)
16if== rank:
17var = sess.run(gvars[idx])
18comm.Bcast(var,root=)
19ifrank >:
20gvars[idx].load(var,sess)
21
22 defAvgTrainVars(trainvars, comm, sess):
23vlen =len(trainvars)
24size = comm.Get_size()
25foridxinxrange(vlen):
26val = sess.run(trainvars[idx])
27valsum =np.zeros(val.shape,dtype=val.dtype)
28comm.Allreduce(val,valsum)
29trainvars[idx].load(valsum/size,sess)
30
31
32# Globalvariables.
33NUM_LABELS =2# The number of labels.
34BATCH_SIZE =100# The number of training examples to use per trainingstep.
35SEED =None# Set to None for random seed.
36
37tf.app.flags.DEFINE_string('train',None,
38'File containing the training data (labels &features).')
39tf.app.flags.DEFINE_string('test',None,
40'File containing the test data (labels & features).')
41tf.app.flags.DEFINE_integer('num_epochs',3,
42'Number of passes over the training data.')
43tf.app.flags.DEFINE_integer('num_hidden',1,
44'Number of nodes in the hidden layer.')
45FLAGS =tf.app.flags.FLAGS
46
47# Extractnumpy representations of the labels and features given rows consisting of:
48# label, feat_0, feat_1, ..., feat_n
49 defextract_data(filename):
50
51# Arrays tohold the labels and feature vectors.
52labels = []
53fvecs = []
54
55# Iterate overthe rows, splitting the label from the features. Convert labels
56# to integersand features to floats.
57forlineinfile(filename):
58row = line.split(",")
59labels.append(int(row[]))
60fvecs.append([float(x)forxinrow[1:]])
61
62# Convert thearray of float arrays into a numpy float matrix.
63fvecs_np =np.matrix(fvecs).astype(np.float32)
64
65# Convert the arrayof int labels into a numpy array.
66labels_np =np.array(labels).astype(dtype=np.uint8)
67
68# Convert theint numpy array into a one-hot matrix.
69labels_onehot = (np.arange(NUM_LABELS) ==labels_np[:,None]).astype(np.float32)
70
71# Return a pairof the feature matrix and the one-hot label matrix.
72returnfvecs_np,labels_onehot
73
74 defmain(argv=None):
75
76# Get the data.
77train_data_filename = FLAGS.train
78test_data_filename = FLAGS.test
79
80# Extract itinto numpy arrays.
81train_data,train_labels =extract_data(train_data_filename)
82test_data, test_labels =extract_data(test_data_filename)
83#print"train_data=", train_data
84
85# Get the shapeof the training data.
86train_size,num_features = train_data.shape
87
88# Get thenumber of epochs for training.
89num_epochs = FLAGS.num_epochs
90
91# Get the sizeof layer one.
92num_hidden = FLAGS.num_hidden
93
94# This is wheretraining samples and labels are fed to the graph.
95# Theseplaceholder nodes will be fed a batch of training data at each
96# training stepusing the argument to the Run() call below.
97x = tf.placeholder("float",shape=[None, num_features])
98y_ = tf.placeholder("float", shape=[None,NUM_LABELS])
99
100# For the test data, hold the entire dataset in oneconstant node.
101test_data_node= tf.constant(test_data)
102
103# Define and initialize the network.
104
105# Initialize the hidden weights and biases.
106w2 =tf.Variable(
107tf.truncated_normal([num_features,num_hidden],
108stddev=0.1,
109seed=SEED))
110b2 = tf.Variable(tf.zeros([1,num_hidden]))
111
112# The hidden layer.
114
115# Initialize the output weights and biases.
116w3 =tf.Variable(
117tf.truncated_normal([num_hidden,NUM_LABELS],
118stddev=0.1,
119seed=SEED))
120b3 =tf.Variable(tf.zeros([1,NUM_LABELS]))
121
122# The output layer.
124
125# Optimization.
126cross_entropy =-tf.reduce_sum(y_*tf.log(y))
127train_step =tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
128trainvars =[varfor_,varintf.train.GradientDescentOptimizer(0.01).compute_gradients(cross_entropy)]
129
130# Evaluation.
131predicted_class= tf.argmax(y,1);
132correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
133accuracy =tf.reduce_mean(tf.cast(correct_prediction,"float"))
134
135# Create a local session to run this computation.
136withtf.Session()ass:
137# Run all the initializers to prepare the trainableparameters.
138tf.global_variables_initializer().run()
139BcastGlobalVariables(mpicomm,mpirank,s)
140
141# Iterate and train.
142forstepinxrange(num_epochs * train_size //(mpisize*BATCH_SIZE) ):
143
144offset= (step *mpisize* BATCH_SIZE) % train_size + mpirank
145batch_data =train_data[offset:(offset + BATCH_SIZE), :]
146batch_labels =train_labels[offset:(offset + BATCH_SIZE)]
147train_step.run(feed_dict=)
148if(step %20)==:
149AvgTrainVars(trainvars,mpicomm, s)
150print"step=", step
151print"Accuracy:",accuracy.eval(feed_dict=)
152print"predicted_class:",predicted_class.eval(feed_dict=)
153
154
155print"w2",s.run(w2)
156print"b2",s.run(b2)
157print"w3 =",s.run(w3)
158print"b3 =",s.run(b3)
159
160 if__name__ =='__main__':
161tf.app.run()
参考文献
[1] http://research.baidu.com/bringing-hpc-techniques-deep-learning/
[2] https://github.com/uber/horovod
领取专属 10元无门槛券
私享最新 技术干货