TensorFlow任意比例压缩分布式通信

大油水藏在公式里。

本文能给你省10万块

TensorFlow是热门的深度学习框架,大量机构用它来训练各种模型。由于训练数据太多,1台服务器往往要算几小时甚至几周后才能看到模型的效果。为节省时间,必须使用多台服务器协作完成同一个训练任务。

本文介绍业界常用的2种通信模式,并提出一个更快的模式。

主从模式

TensorFlow支持分布式训练,采用主从模式(图1),劳工分别使用不同的样本进行训练,得到参数w(权重)和b(偏置);所有劳工每次使用一批(batch)数据训练、得到w的梯度Δw和b的梯度Δb;所有劳工将自己算得的梯度传给参数服务器,参数服务器计算所有权重梯度、偏置梯度的平均值,用平均值来用更新w和b,然后将最新的w和b传回各个劳工;劳工进行下一步训练。

图1

这样的主从模式下,参数服务器网络带宽是瓶颈。假设采用1个参数服务器和10个劳工,服务器之间用万兆以太网(带宽10Gb/s)连接,需要传输的参数大小是1GB,所有劳工将参数传给参数服务器需要花10*1/(10/8)=8秒,参数服务器将参数传回劳工还要再花8秒,共16秒。如果采用1个参数服务器和100劳工,那么参数传输共要花160秒。扩展性太差。

为了缓解参数服务器的网络瓶颈,TensorFlow支持多个参数服务器,见图2,每个参数服务器负责一部分参数的更新。以图2为例,假设有2个参数服务器和10个劳工,服务器之间用万兆以太网连接,需要传输的参数大小是1GB,所有劳工将参数分为两半,分别传给2台参数服务器需要花10*0.5/(10/8)=4秒,参数服务器将参数传给劳工还要再花4秒,共8秒。比1个参数服务器集群节省了一半的通信时间,代价就是增加了一个参数服务器。容易验证,用M个参数服务器能将通信时间减少至1/M,但实际上通信时间不会随着M的增加而一直减少。

图2

环形归约模式

多个参数服务器虽然能在一定程度上缓解通信瓶颈,但是无法从根本上消除瓶颈。因此,百度率先开发了一种称环形归约的通信模式,通过均衡网络负载来减少通信时间。

假设有5个主机,每个主机上有1个GPU,网络相互连通,逻辑上可以看成环状,见图3。

图3

假设5个GPU上都已经分别计算出了自己的权重梯度Δw、Δw1…Δw4偏置梯度Δb、Δb1…Δb4,下一步是5个GPU之间要交换数据,使用每个GPU上都有值Δw+Δw1+…+Δw4和Δb+Δb1+…+Δb4。这个交换过程可用如下方式实现。

每个GPU都将自己的待交换数据分为5段,如图4,这里的段数5与参与通信的GPU数量相同。

图4

接下来进行第一圈轮转相加,每次1/5的数

第一圈之后状态如图5,每个GPU都有一小块满足要求的数据。

图5

第二圈是轮转替换,也需要5步。

图6

第二圈完成之后的状态如图6,每个GPU都得到全部数据的和,达到目标。

现在考查环形归约的通信时间。假设有10个服务器(对应图中的GPU)要相互交换数据,服务器之间网络为万兆以太网(带宽10Gb/s),需要传输的参数大小是1GB,第一圈轮转相加时,每步传递数据1/10=0.1GB,花费时间0.1/(10/8)=0.08秒,10步共花费0.8秒。第二圈通信也花费0.8秒,共1.6秒。

假设有100个服务器参与通信,容易知道共花费1.6秒。实际上,无论有多少个服务器参与通信,都是花费1.6秒。比主从模式好太多。

百度开发了环形归约的代码。英伟达对环形归约做了深度优化,做成一个闭源软件包NCCL2.0,具体优化手段保密十分严格,据说买它1亿元GPU产品的公司都打听不出来。本人机缘巧合得知了这个秘密,只是不能公开说。

Uber开发了一个小软件包horovod[2],把NCCL2.0和TensorFlow对接了起来,摆脱了TensorFlow的Grpc通信组件,改用MPI通信,性能大幅提高。

交换权重

经过前面的准备,迎来大招。

主从模式和环形归约模式中,主机之间交换都是梯度数据。而梯度取决于当前样本、当前权重偏置,每计算完一批样本之后必须立即交换梯度,否则就导致收敛曲线振荡甚至不收敛。

如果想进一步减少通信时间怎么呢?有人想出了办法:通信时将梯度值由32位浮点精度降低为16位浮点精度,或者将梯度矩阵分解为两个向量(两个较小矩阵)的乘积。总之就是损失精度换取速度。

其实,仔细观察深度学习的通用公式就能发现减少通信时间的好方法:不通信。

从梯度更新公式可知,每一批样本数据都会对参数w和b做一些改动,这个改动量就是梯度Δw和Δb。通常情况下,训练样本都已经被混洗均匀,每批样本都与整体分布大致相符。如果使用Q批样本连续更新参数w和b,那么这些样本对参数的Q次改动已经积累在w和b中了。因此,不必训练完每批样本后立即交换梯度,而是在训练完Q批样本后再交换参数w和b本身。这里的Q可以随意指定,这也意味着可以任意比例减少通信时间,具体操作示意图见图7.

图7

代码实现。虽然有了思路,但在TensorFlow里实现不易,这是因为TensorFlow里都是更新梯度,可能没有想过为交换参数本身提供工具。在踩过多个坑之后发现,还是在TensorFlow之外通信方便。

用3层神经网络进行二分类,将土星环状样本点(图8)分开;分类结果见图9,绿色多边形将两类样本点完分开。

图8

图9

简要分析代码。

第3行引入软件包mpi4py。第4-9行定义几个全局变量,mpicomm是MPI通信域,包含所有的MPI进程;mpirank是当前进程的编号,从0开始;mpisize是mpicomm中进程的数量;mpilocalrank是当前过程在本地服务器上的编号,例如在2台服务器上共启动了4个进程,每台服务器上有2个进程,那么mpilocalrank的取值为0或1。Mpilocalrank用于将进程与服务器上的多个GPU分别绑定,本例中没有用到。

第11-20行定义的函数BcastGlobalVariables用于广播全局变量。在tensorflow初始化完成之后,将0号进程的全局变量值广播给所有进程,使所有进程的初始值都相同。

第22-29行定义的函数AvgTrainVars用于在所有进程上归约得到所有的训练变量值,要求事先获取训练变量。

第128行获取训练变量的列表trainvars,被优化器更新所有可训练变量都在里面。第139行在tensorflow初始化全局变量之后立即广播。

第144行用mpirank参数计算样本的偏移量,使得每个进程都在不同的样本上训练。第148-149行,每隔20步训练才进行一次归约通信,减少通信量。间隔步数20可以随意指定,从而可以任意比例压缩通信时间。

对比输出的w2、b2、w3、b3的值,两个进程相差无几,算法正确。

1importnumpyasnp

2importtensorflowastf

3frommpi4pyimportMPI

4

5mpicomm = MPI.COMM_WORLD

6mpirank =mpicomm.Get_rank()

7mpisize =mpicomm.Get_size()

8mpilcomm =mpicomm.Split_type(MPI.COMM_TYPE_SHARED)

9mpilocalrank =mpilcomm.Get_rank()

10

11 defBcastGlobalVariables(comm, rank,sess):

12gvars = tf.global_variables()

13vlen =len(gvars)

14foridxinxrange(vlen):

15var =np.zeros(gvars[idx].shape,dtype=gvars[idx].dtype.as_numpy_dtype)

16if== rank:

17var = sess.run(gvars[idx])

18comm.Bcast(var,root=)

19ifrank >:

20gvars[idx].load(var,sess)

21

22 defAvgTrainVars(trainvars, comm, sess):

23vlen =len(trainvars)

24size = comm.Get_size()

25foridxinxrange(vlen):

26val = sess.run(trainvars[idx])

27valsum =np.zeros(val.shape,dtype=val.dtype)

28comm.Allreduce(val,valsum)

29trainvars[idx].load(valsum/size,sess)

30

31

32# Globalvariables.

33NUM_LABELS =2# The number of labels.

34BATCH_SIZE =100# The number of training examples to use per trainingstep.

35SEED =None# Set to None for random seed.

36

37tf.app.flags.DEFINE_string('train',None,

38'File containing the training data (labels &features).')

39tf.app.flags.DEFINE_string('test',None,

40'File containing the test data (labels & features).')

41tf.app.flags.DEFINE_integer('num_epochs',3,

42'Number of passes over the training data.')

43tf.app.flags.DEFINE_integer('num_hidden',1,

44'Number of nodes in the hidden layer.')

45FLAGS =tf.app.flags.FLAGS

46

47# Extractnumpy representations of the labels and features given rows consisting of:

48# label, feat_0, feat_1, ..., feat_n

49 defextract_data(filename):

50

51# Arrays tohold the labels and feature vectors.

52labels = []

53fvecs = []

54

55# Iterate overthe rows, splitting the label from the features. Convert labels

56# to integersand features to floats.

57forlineinfile(filename):

58row = line.split(",")

59labels.append(int(row[]))

60fvecs.append([float(x)forxinrow[1:]])

61

62# Convert thearray of float arrays into a numpy float matrix.

63fvecs_np =np.matrix(fvecs).astype(np.float32)

64

65# Convert the arrayof int labels into a numpy array.

66labels_np =np.array(labels).astype(dtype=np.uint8)

67

68# Convert theint numpy array into a one-hot matrix.

69labels_onehot = (np.arange(NUM_LABELS) ==labels_np[:,None]).astype(np.float32)

70

71# Return a pairof the feature matrix and the one-hot label matrix.

72returnfvecs_np,labels_onehot

73

74 defmain(argv=None):

75

76# Get the data.

77train_data_filename = FLAGS.train

78test_data_filename = FLAGS.test

79

80# Extract itinto numpy arrays.

81train_data,train_labels =extract_data(train_data_filename)

82test_data, test_labels =extract_data(test_data_filename)

83#print"train_data=", train_data

84

85# Get the shapeof the training data.

86train_size,num_features = train_data.shape

87

88# Get thenumber of epochs for training.

89num_epochs = FLAGS.num_epochs

90

91# Get the sizeof layer one.

92num_hidden = FLAGS.num_hidden

93

94# This is wheretraining samples and labels are fed to the graph.

95# Theseplaceholder nodes will be fed a batch of training data at each

96# training stepusing the argument to the Run() call below.

97x = tf.placeholder("float",shape=[None, num_features])

98y_ = tf.placeholder("float", shape=[None,NUM_LABELS])

99

100# For the test data, hold the entire dataset in oneconstant node.

101test_data_node= tf.constant(test_data)

102

103# Define and initialize the network.

104

105# Initialize the hidden weights and biases.

106w2 =tf.Variable(

107tf.truncated_normal([num_features,num_hidden],

108stddev=0.1,

109seed=SEED))

110b2 = tf.Variable(tf.zeros([1,num_hidden]))

111

112# The hidden layer.

114

115# Initialize the output weights and biases.

116w3 =tf.Variable(

117tf.truncated_normal([num_hidden,NUM_LABELS],

118stddev=0.1,

119seed=SEED))

120b3 =tf.Variable(tf.zeros([1,NUM_LABELS]))

121

122# The output layer.

124

125# Optimization.

126cross_entropy =-tf.reduce_sum(y_*tf.log(y))

127train_step =tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)

128trainvars =[varfor_,varintf.train.GradientDescentOptimizer(0.01).compute_gradients(cross_entropy)]

129

130# Evaluation.

131predicted_class= tf.argmax(y,1);

132correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))

133accuracy =tf.reduce_mean(tf.cast(correct_prediction,"float"))

134

135# Create a local session to run this computation.

136withtf.Session()ass:

137# Run all the initializers to prepare the trainableparameters.

138tf.global_variables_initializer().run()

139BcastGlobalVariables(mpicomm,mpirank,s)

140

141# Iterate and train.

142forstepinxrange(num_epochs * train_size //(mpisize*BATCH_SIZE) ):

143

144offset= (step *mpisize* BATCH_SIZE) % train_size + mpirank

145batch_data =train_data[offset:(offset + BATCH_SIZE), :]

146batch_labels =train_labels[offset:(offset + BATCH_SIZE)]

147train_step.run(feed_dict=)

148if(step %20)==:

149AvgTrainVars(trainvars,mpicomm, s)

150print"step=", step

151print"Accuracy:",accuracy.eval(feed_dict=)

152print"predicted_class:",predicted_class.eval(feed_dict=)

153

154

155print"w2",s.run(w2)

156print"b2",s.run(b2)

157print"w3 =",s.run(w3)

158print"b3 =",s.run(b3)

159

160 if__name__ =='__main__':

161tf.app.run()

参考文献

[1] http://research.baidu.com/bringing-hpc-techniques-deep-learning/

[2] https://github.com/uber/horovod

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20180118B0T72Z00?refer=cp_1026

扫码关注云+社区