对于运筹优化算法工作者来说,更优的结果和更快的运行速度是我们不懈追求的目标。在实践中我们很 容易就能感受到,这个目标主要取决于算法本身的设计,同时也取决于算法的具体实现方式。本文主要 分享一点算法实现中的加速方法,特别针对python用户。本文将以分支定价求解VRPTW为例,主要介绍 两个方面的技巧,第一个是在python中使用C++库,第二个是分支定界过程的并行化,希望能给大家带来一些帮助。
Python是当前最热门的编程语言之一,特别是在数据科学、深度学习等领域。但是Python的性能问题却 是一个绕不开的话题,以至于算法的核心部分很多人选择用C/C++重写。本文要讲的第一部分内容就是 假设你用python实现了自己的算法,然后发现算法的某个部分刚好有一个现成的C/C++库可以使用,如 何在你的代码里调用这个库呢?方法是多种多样的,这里以VRPTW为例介绍其中的一种方法。
数据魔术师的粉丝应该记得,数据魔术师曾经发布过一个脉冲算法求解ESPPRC的C++实现(忘记的同学可以 戳这里)。ESPPRC是分支定价求解VRPTW时的子问题,如果我们用这个库去求解子问题,会比我们自己用python实现一遍脉冲算法要快得多。我们把这个子问题的求解过程看作一个黑箱,先分析一下我们需要的输入和输出是什么。在进行列生成的时候,我们希望对这个黑箱传入一组主问题那里得到的对偶变量,然后这个黑箱吐出一个reduced_cost最小的合法路径。明确了这个输入和输出,我们需要先对C++代码略做修改。以下是原始C++代码的main函数。
int main(int args, char *argv[]) {
std::string filename = argv[1];
//std::string filename = "data/solomon_100/C201.txt";
int max_capacity;
std::vector< std::vector<int> > data;
reader(data, filename, max_capacity);
int node_number = int(data.size()) + 1, step = 10, lt = int(0.1 * data[0]
[4]), ut = data[0][4];
Espprc model(node_number, 0, node_number - 1, step, lt, ut, max_capacity,
data);
//model.G.show_graph(); model.print_para();
double duration;
std::clock_t start = std::clock();
model.espprc();
duration = ( std::clock() - start ) / (double) CLOCKS_PER_SEC;
std::cout<<"duration: "<<duration<<std::endl;
return 0;
}
可以看到原始的输入是算例文件地址,输出是标准输出流。现在算例文件地址这个输入保留,需要加入 一个类似std::vector的输入来表示对偶变量,输出也要换成类似std::vector用以表示得到 的路径。假设我们把这个函数重新命名为calculate(),现在calculate()接收算例文件地址和对偶变量,返 回reduced_cost最小的路径。(这里对偶变量命名沿用C++中的fixed_rand,方便大家理解原始C++代码)
std::vector<int> calculate(std::string filename, std::vector<double> fixed_rand)
{
int max_capacity;
std::vector<std::vector<int>> data;
reader(data, filename, max_capacity);
int node_number = int(data.size()) + 1 , step = 10 , lt = int(0.1 * data[ 0 ]
[ 4 ]), ut = data[ 0 ][ 4 ];
Espprc model(node_number, 0 , node_number - 1 , step, lt, ut, max_capacity,
data, fixed_rand);
return model.espprc();
}
为了让python中能调用这个calculate()函数,需要把C++文件编译成动态链接库(.dll)。但是python不认识C++的std::vector和std::string,因此需要做一个”包装“,使得python的列表和字符串能和上述 C++的数据类型相互转换。幸好这个繁琐的过程有现成的工具可用,比如swig(swig的使用不是本文的重点,大家百度一下,教程很多。除了swig也有其他方法,swig差不多算是最方便的了)。swig会生成两个文件:.py文件可以认为就是我们在python里将要调用的那个包含了calculate()的包,.cxx文件里就是我们上面说的转换数据类型的代码,把这个.cxx文件跟之前修改后的C++代码放在一起编译。比如我们 的.py文件叫做espprc.py,那么编译后的.dll文件默认应该改名为_espprc.pyd。现在把这个espprc.py和_espprc.pyd放进python代码目录下面,就可以使用我们熟悉的 "from espprc import calculate" 了。
我们现在把分支定价过程里的耗时大户——子问题求解改用C++实现了,效率增加了不少,可以稍微歇息一下,接下来进入本文要介绍的第二个技巧。
随着分支的进行,需要求解的节点越来越多,一个自然的想法是并行化求解这些节点。鉴于这是一个CPU bounding任务,采用多进程的方案是比较稳妥的。但是多进程求解分支定界问题的难度在于节点之间不是孤立的,比如我们采用的分支方式是包含边(a,b)和不含边(a,b)。那么子节点的初始路径池、边(a,b)这两个信息需要从父节点传到子节点,这需要借助进程间通信实现。更繁琐的是为了更新全局的上下界,每个节点的求解结果都需要被保存和传递,这又是一系列的进程间通信。不过幸运的是有python工具包已经实现了多进程的分支定界框架。pybnb就是这样一个工具,它只需要我们定义好一个问题类,而不用关心分支定界的具体实施过程。具体来说是定义好问题的目标函数值计算方式、界计算方式、状态传递方式、分支方式。
import pybnb
class MyProblem(pybnb.Problem):
def __init__(self): ...
# required methods
def sense(self): ...
def objective(self): ...
def bound(self): ...
def save_state(self, node): ...
def load_state(self, node): ...
def branch(self): ...
# optional methods
def notify_solve_begins(self,
comm,
worker_comm,
convergence_checker):
...
def notify_new_best_node(self,
node,
current):
...
def notify_solve_finished(self,
comm,
worker_comm,
results):
...
上面的代码就是pybnb的接口示例,我们最主要的工作就是要定义objective()、bound()、save_state()、load_state()、branch()这几个方法。load_state()方法是定义节点要传递的内容,如上所述,我们要传递的是路径池、去掉的边、强制保留的边,那么我们的load_state()就如下所示:
def load_state(self, node):
# ini_continuous_routes_pool路径池 ini_ban_edges去掉的边 ini_set_edges强制保留的边
(self.ini_continuous_routes_pool, self.ini_ban_edges, self.ini_set_edges) = node.state
然后在branch()方法内会定义子节点内这些状态时是如何生成的:
def branch(self):
child_list = []
# 找出要分支的边
edge_to_branch =
self.choose_edge_to_branch(self.solved_continuous_routes_pool,
self.solved_continuous_coeff)
self.edge_handled.append(edge_to_branch)
# 左孩子中强制保留这条边
child = pybnb.Node()
new_ban_edges = self.ini_ban_edges + [edge_to_branch]
new_set_edges = self.ini_set_edges
child.state = (self.solved_continuous_routes_pool, new_ban_edges,
new_set_edges)
child_list.append(child)
# 右孩子中强制去除这条边
child = pybnb.Node()
new_ban_edges = self.ini_ban_edges
new_set_edges = self.ini_set_edges + [edge_to_branch]
child.state = (self.solved_continuous_routes_pool, new_ban_edges,
new_set_edges)
child_list.append(child)
return child_list
为了实现分支作用,我们可以单独定义一个函数update_distance,在执行objective()函数之前调用这个update_distance()函数:
def update_distance(self):
self.distance_local = copy.deepcopy(self.distance)
# 对于去除的边,把它的长度设置为一个很大的数,比如100000000
for ban_edge in self.ini_ban_edges:
self.distance_local[ban_edge[0]][ban_edge[1]] = self.largenum
# 对于必须保存的边(a,b), (a,x)和(x,b)的长度设置为很大的数
for set_edge in self.ini_set_edges:
for i in range(len(self.distance)):
if i != set_edge[0] and set_edge[1] != 0:
self.distance_local[i][set_edge[1]] = self.largenum
if i != set_edge[1] and set_edge[0] != 0:
self.distance_local[set_edge[0]][i] = self.largenum
objective()和bound()是整合的,意思是我们在求解当前节点时,主问题的LP松弛解就是bound,整数解就是objective,这两个过程实际上是捆绑在一起的。这部分代码比较长,大家可以在附件里看,这里就不贴了。到此为止我们已经把主要的流程介绍完毕,剩下就是具体实现了。为了避免大家没有安装商业 求解器,这份代码里的求解器我使用了开源求解器cbc,大家只需要安装orTools就行了,其内部集成了cbc。另外在萨洛蒙算例中随机取点的方式生成了一批20-90个点的新算例,算例文件一并放在了代码文件夹下。
1.本文以VRPTW求解为例,目的是介绍python代码的加速技巧,不是VRPTW的SOTA。
2.如果有兴趣在本文的方案上继续改进,则有如下的可能方向:
以下是使用10个进程求解的部分结果。pybnb是基于MPI的,windows用户可以搜索msmpi并下载安 装。安装完成后可以在命令行中运行
mpiexec -n 10 python XXX.py
-n指定进程数,XXX.py 为要运行的python文件名。以下是部分计算结果(Intel i5-10500 3.1GHz):
算例"30-0",求解时间5.32s.
算例"30-2",求解时间0.9s.
算例"40-1",求解时间2.08 s.
算例"40-2",求解时间1.97 s.
欲下载本文相关代码,请移步留言区
-The End-
如有需求,可以联系: 刘嵘(北京快成科技有限公司,buptrong@163.com,算法工程师(运筹优化算法在交通运输领域的实践,主要是MIP, CP)
秦虎老师(华中科技大学管理学院,微信号43340630)