前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >差分进化算法Python实现

差分进化算法Python实现

作者头像
里克贝斯
发布2021-05-21 16:05:29
1.3K0
发布2021-05-21 16:05:29
举报
文章被收录于专栏:图灵技术域图灵技术域

DE 算法主要用于求解连续变量的全局优化问题,其主要工作步骤与其他进化算法基本一致,主要包括变异(Mutation)、交叉(Crossover)、选择(Selection)三种操作。算法的基本思想是从某一随机产生的初始群体开始,利用从种群中随机选取的两个个体的差向量作为第三个个体的随机变化源,将差向量加权后按照一定的规则与第三个个体求和而产生变异个体,该操作称为变异。然后,变异个体与某个预先决定的目标个体进行参数混合,生成试验个体,这一过程称之为交叉。如果试验个体的适应度值优于目标个体的适应度值,则在下一代中试验个体取代目标个体,否则目标个体仍保存下来,该操作称为选择。在每一代的进化过程中,每一个体矢量作为目标个体一次,算法通过不断地迭代计算,保留优良个体,淘汰劣质个体,引导搜索过程向全局最优解逼近。

DE算法伪代码:

DE算法Python实现

代码语言:javascript
复制
from scitbx.array_family import flex
from stdlib import random
class differential_evolution_optimizer(object):

  def __init__(self,
               evaluator,
               population_size=50,
               f=None,
               cr=0.9,
               eps=1e-2,
               n_cross=1,
               max_iter=10000,
               monitor_cycle=200,
               out=None,
               show_progress=False,
               show_progress_nth_cycle=1,
               insert_solution_vector=None,
               dither_constant=0.4):
    self.dither=dither_constant
    self.show_progress=show_progress
    self.show_progress_nth_cycle=show_progress_nth_cycle
    self.evaluator = evaluator
    self.population_size = population_size
    self.f = f
    self.cr = cr
    self.n_cross = n_cross
    self.max_iter = max_iter
    self.monitor_cycle = monitor_cycle
    self.vector_length = evaluator.n
    self.eps = eps
    self.population = []
    self.seeded = False
    if insert_solution_vector is not None:
      assert len( insert_solution_vector )==self.vector_length
      self.seeded = insert_solution_vector
    for ii in xrange(self.population_size):
      self.population.append( flex.double(self.vector_length,0) )


    self.scores = flex.double(self.population_size,1000)
    self.optimize()
    self.best_score = flex.min( self.scores )
    self.best_vector = self.population[ flex.min_index( self.scores ) ]
    self.evaluator.x = self.best_vector
    if self.show_progress:
      self.evaluator.print_status(
            flex.min(self.scores),
            flex.mean(self.scores),
            self.population[ flex.min_index( self.scores ) ],
            'Final')


  def optimize(self):
    # initialise the population please
    self.make_random_population()
    # score the population please
    self.score_population()
    converged = False
    monitor_score = flex.min( self.scores )
    self.count = 0
    while not converged:
      self.evolve()
      location = flex.min_index( self.scores )
      if self.show_progress:
        if self.count%self.show_progress_nth_cycle==0:
          # make here a call to a custom print_status function in the evaluator function
          # the function signature should be (min_target, mean_target, best vector)
          self.evaluator.print_status(
            flex.min(self.scores),
            flex.mean(self.scores),
            self.population[ flex.min_index( self.scores ) ],
            self.count)

      self.count += 1
      if self.count%self.monitor_cycle==0:
        if (monitor_score-flex.min(self.scores) ) < self.eps:
          converged = True
        else:
         monitor_score = flex.min(self.scores)
      rd = (flex.mean(self.scores) - flex.min(self.scores) )
      rd = rd*rd/(flex.min(self.scores)*flex.min(self.scores) + self.eps )
      if ( rd < self.eps ):
        converged = True


      if self.count>=self.max_iter:
        converged =True

  def make_random_population(self):
    for ii in xrange(self.vector_length):
      delta  = self.evaluator.domain[ii][1]-self.evaluator.domain[ii][0]
      offset = self.evaluator.domain[ii][0]
      random_values = flex.random_double(self.population_size)
      random_values = random_values*delta+offset
      # now please place these values ni the proper places in the
      # vectors of the population we generated
      for vector, item in zip(self.population,random_values):
        vector[ii] = item
    if self.seeded is not False:
      self.population[0] = self.seeded

  def score_population(self):
    for vector,ii in zip(self.population,xrange(self.population_size)):
      tmp_score = self.evaluator.target(vector)
      self.scores[ii]=tmp_score

  def evolve(self):
    for ii in xrange(self.population_size):
      rnd = flex.random_double(self.population_size-1)
      permut = flex.sort_permutation(rnd)
      # make parent indices
      i1=permut[0]
      if (i1>=ii):
        i1+=1
      i2=permut[1]
      if (i2>=ii):
        i2+=1
      i3=permut[2]
      if (i3>=ii):
        i3+=1
      #
      x1 = self.population[ i1 ]
      x2 = self.population[ i2 ]
      x3 = self.population[ i3 ]

      if self.f is None:
        use_f = random.random()/2.0 + 0.5
      else:
        use_f = self.f

      vi = x1 + use_f*(x2-x3)
      # prepare the offspring vector please
      rnd = flex.random_double(self.vector_length)
      permut = flex.sort_permutation(rnd)
      test_vector = self.population[ii].deep_copy()
      # first the parameters that sure cross over
      for jj in xrange( self.vector_length  ):
        if (jj<self.n_cross):
          test_vector[ permut[jj] ] = vi[ permut[jj] ]
        else:
          if (rnd[jj]>self.cr):
            test_vector[ permut[jj] ] = vi[ permut[jj] ]
      # get the score please
      test_score = self.evaluator.target( test_vector )
      # check if the score if lower
      if test_score < self.scores[ii] :
        self.scores[ii] = test_score
        self.population[ii] = test_vector


  def show_population(self):
    print "+++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
    for vec in self.population:
      print list(vec)
    print "+++++++++++++++++++++++++++++++++++++++++++++++++++++++++"


class test_function(object):
  def __init__(self):
    self.x = None
    self.n = 9
    self.domain = [ (-100,100) ]*self.n
    self.optimizer =  differential_evolution_optimizer(self,population_size=100,n_cross=5)
    assert flex.sum(self.x*self.x)<1e-5

  def target(self, vector):
    tmp = vector.deep_copy()
    result = (flex.sum(flex.cos(tmp*10))+self.n+1)*flex.sum( (tmp)*(tmp) )
    return result


class test_rosenbrock_function(object):
  def __init__(self, dim=5):
    self.x = None
    self.n = 2*dim
    self.dim = dim
    self.domain = [ (1,3) ]*self.n
    self.optimizer =  differential_evolution_optimizer(self,population_size=min(self.n*10,40),n_cross=self.n,cr=0.9, eps=1e-8, show_progress=True)
    print list(self.x)
    for x in self.x:
      assert abs(x-1.0)<1e-2


  def target(self, vector):
    tmp = vector.deep_copy()
    x_vec = vector[0:self.dim]
    y_vec = vector[self.dim:]
    result=0
    for x,y in zip(x_vec,y_vec):
      result+=100.0*((y-x*x)**2.0) + (1-x)**2.0
    #print list(x_vec), list(y_vec), result
    return result

  def print_status(self, mins,means,vector,txt):
    print txt,mins, means, list(vector)


def run():
  random.seed(0)
  flex.set_random_seed(0)
  test_rosenbrock_function(1)
  print("OK")

if __name__ == "__main__":
  run()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DE算法伪代码:
  • DE算法Python实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档